celery源码分析

本文环境python3.5.2,celery4.0.2,django1.10.x系列

celery的任务发送

在Django项目中使用了装饰器来包装待执行任务,

from celery import shared_task, app@shared_task
def add(x, y):return x + y@app.task(bind=True)
def debug_task(self):print('Request: {0!r}'.format(self.request))

此时分析一下,Task是怎样在celery中执行的。

首先,先看shared_task函数,

def shared_task(*args, **kwargs):"""Create shared task (decorator).This can be used by library authors to create tasks that'll workfor any app environment.                                                # 由shared_task装饰的任务可以被任何app调用Returns:~celery.local.Proxy: A proxy that always takes the task from thecurrent apps task registry.Example:>>> from celery import Celery, shared_task>>> @shared_task... def add(x, y):...     return x + y...>>> app1 = Celery(broker='amqp://')>>> add.app is app1True>>> app2 = Celery(broker='redis://')>>> add.app is app2True                                                    """def create_shared_task(**options):def __inner(fun):name = options.get('name')                                          # 从装饰器中获取是否传入name参数# Set as shared task so that unfinalized apps,# and future apps will register a copy of this task._state.connect_on_app_finalize(lambda app: app._task_from_fun(fun, **options))                                                                   # 将该task添加到全局变量中,当其他app调用该函数时会将该任务添加到app任务列表中,以此达到所有任务共享# Force all finalized apps to take this task as well.for app in _state._get_active_apps():                               # 获取所有app的弱引用if app.finalized:                                               # 是否任务初始化过with app._finalize_mutex:                                   # 获取线程锁app._task_from_fun(fun, **options)                      # 加载该任务# Return a proxy that always gets the task from the current# apps task registry.def task_by_cons():app = _state.get_current_app()                                  # 获取当前的appreturn app.tasks[name or app.gen_task_name(fun.__name__, fun.__module__)]                                                               # 根据task的name或者fun来获取对应的taskreturn Proxy(task_by_cons)                                          # 通过代理类实例化task_by_consreturn __inner                                                          # 返回被__innerif len(args) == 1 and callable(args[0]):                                    # 如果装饰器传入参数就1个并且是可调用的,即shared_task没有传入参数return create_shared_task(**kwargs)(args[0])                            # 直接调用该函数并传入该函数return create_shared_task(*args, **kwargs)                                  # 处理shared_task中的传入参数

按照示例中的无参数调用则返回了Proxy的实例,传入参数就是task_by_cons,此时查看一下Proxy类的实现,该类位于celery/local.py中,

class Proxy(object):"""Proxy to another object."""# Code stolen from werkzeug.local.Proxy.__slots__ = ('__local', '__args', '__kwargs', '__dict__')def __init__(self, local,args=None, kwargs=None, name=None, __doc__=None):object.__setattr__(self, '_Proxy__local', local)            # 将传入参数local设置到_Proxy__local属性中object.__setattr__(self, '_Proxy__args', args or ())        # 设置列表属性object.__setattr__(self, '_Proxy__kwargs', kwargs or {})    # 设置键值属性if name is not None:object.__setattr__(self, '__custom_name__', name)       if __doc__ is not None:object.__setattr__(self, '__doc__', __doc__)...def _get_current_object(self):"""Get current object.This is useful if you want the realobject behind the proxy at a time for performance reasons or becauseyou want to pass the object into a different context."""loc = object.__getattribute__(self, '_Proxy__local')        # 获取初始化传入的localif not hasattr(loc, '__release_local__'):                   # 如果没有__release_local__属性return loc(*self.__args, **self.__kwargs)               # 函数调用,将初始化的值传入调用该函数try:  # pragma: no cover# not sure what this is aboutreturn getattr(loc, self.__name__)                      # 获取当前__name__属性值except AttributeError:  # pragma: no coverraise RuntimeError('no object bound to {0.__name__}'.format(self))...def __getattr__(self, name):if name == '__members__':return dir(self._get_current_object())return getattr(self._get_current_object(), name)            # 获取obj的属性def __setitem__(self, key, value):self._get_current_object()[key] = value                     # 设置key valdef __delitem__(self, key):del self._get_current_object()[key]                         # 删除对应keydef __setslice__(self, i, j, seq):self._get_current_object()[i:j] = seq                       # 列表操作def __delslice__(self, i, j):del self._get_current_object()[i:j]def __setattr__(self, name, value):setattr(self._get_current_object(), name, value)            # 设置属性def __delattr__(self, name):delattr(self._get_current_object(), name)                   # 删除对应属性

只选取了部分属性分析如上,主要是根据传入的是否local是否是函数,或者包含release_local来判断是否是调用函数,或是获取属性来处理。

此时在初始化过程中,为每个app添加该任务时,会调用到app._task_from_fun(fun, **options),

    def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):if not self.finalized and not self.autofinalize:raise RuntimeError('Contract breach: app not finalized')name = name or self.gen_task_name(fun.__name__, fun.__module__)         # 如果传入了名字则使用,否则就使用moudle name的形式base = base or self.Task                                                # 是否传入Task,否则用类自己的Task类 默认celery.app.task:Taskif name not in self._tasks:                                             # 如果要加入的任务名称不再_tasks中run = fun if bind else staticmethod(fun)                            # 是否bind该方法是则直接使用该方法,否则就置为静态方法task = type(fun.__name__, (base,), dict({'app': self,                                                    # 动态创建Task类实例'name': name,                                                   # Task的name'run': run,                                                     # task的run方法'_decorated': True,                                             # 是否装饰'__doc__': fun.__doc__,'__module__': fun.__module__,'__header__': staticmethod(head_from_fun(fun, bound=bind)),'__wrapped__': run}, **options))()                              # for some reason __qualname__ cannot be set in type()# so we have to set it here.try:task.__qualname__ = fun.__qualname__                            except AttributeError:passself._tasks[task.name] = task                                       # 将任务添加到_tasks任务中task.bind(self)  # connects task to this app                        # 调用task的bind方法绑定相关属性到该实例上autoretry_for = tuple(options.get('autoretry_for', ()))retry_kwargs = options.get('retry_kwargs', {})if autoretry_for and not hasattr(task, '_orig_run'):@wraps(task.run)def run(*args, **kwargs):try:return task._orig_run(*args, **kwargs)except autoretry_for as exc:raise task.retry(exc=exc, **retry_kwargs)task._orig_run, task.run = task.run, runelse:task = self._tasks[name]                                            # 否则获取该taskreturn task                                                             # 返回该task

其中task在默认情况下是celery.app.task:Task,在动态生成该实例后,滴啊用了task.bind(self)方法,

@classmethod
def bind(cls, app):was_bound, cls.__bound__ = cls.__bound__, Truecls._app = app                                          # 设置类的_app属性conf = app.conf                                         # 获取app的配置信息cls._exec_options = None  # clear option cacheif cls.typing is None:cls.typing = app.strict_typingfor attr_name, config_name in cls.from_config:          # 设置类中的默认值if getattr(cls, attr_name, None) is None:           # 如果获取该属性为空setattr(cls, attr_name, conf[config_name])      # 使用app配置中的默认值# decorate with annotations from config.if not was_bound:cls.annotate()from celery.utils.threads import LocalStackcls.request_stack = LocalStack()                    # 使用线程栈保存数据# PeriodicTask uses this to add itself to the PeriodicTask schedule.cls.on_bound(app)return app

此时在Django项目中调用该异步任务时,如下调用,

add.delay(1,2)

此时就是通过代理类获取task的delay方法,

def delay(self, *args, **kwargs):"""Star argument version of :meth:`apply_async`.Does not support the extra options enabled by :meth:`apply_async`.Arguments:*args (Any): Positional arguments passed on to the task.**kwargs (Any): Keyword arguments passed on to the task.Returns:celery.result.AsyncResult: Future promise."""return self.apply_async(args, kwargs)

此时直接调用了self.apply_async方法,

def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,link=None, link_error=None, shadow=None, **options):"""Apply tasks asynchronously by sending a message.Arguments:args (Tuple): The positional arguments to pass on to the task.kwargs (Dict): The keyword arguments to pass on to the task.countdown (float): Number of seconds into the future that thetask should execute.  Defaults to immediate execution.eta (~datetime.datetime): Absolute time and date of when the taskshould be executed.  May not be specified if `countdown`is also supplied.expires (float, ~datetime.datetime): Datetime orseconds in the future for the task should expire.The task won't be executed after the expiration time.shadow (str): Override task name used in logs/monitoring.Default is retrieved from :meth:`shadow_name`.connection (kombu.Connection): Re-use existing broker connectioninstead of acquiring one from the connection pool.retry (bool): If enabled sending of the task message will beretried in the event of connection loss or failure.Default is taken from the :setting:`task_publish_retry`setting.  Note that you need to handle theproducer/connection manually for this to work.retry_policy (Mapping): Override the retry policy used.See the :setting:`task_publish_retry_policy` setting.queue (str, kombu.Queue): The queue to route the task to.This must be a key present in :setting:`task_queues`, or:setting:`task_create_missing_queues` must beenabled.  See :ref:`guide-routing` for moreinformation.exchange (str, kombu.Exchange): Named custom exchange to send thetask to.  Usually not used in combination with the ``queue``argument.routing_key (str): Custom routing key used to route the task to aworker server.  If in combination with a ``queue`` argumentonly used to specify custom routing keys to topic exchanges.priority (int): The task priority, a number between 0 and 9.Defaults to the :attr:`priority` attribute.serializer (str): Serialization method to use.Can be `pickle`, `json`, `yaml`, `msgpack` or any customserialization method that's been registeredwith :mod:`kombu.serialization.registry`.Defaults to the :attr:`serializer` attribute.compression (str): Optional compression methodto use.  Can be one of ``zlib``, ``bzip2``,or any custom compression methods registered with:func:`kombu.compression.register`.Defaults to the :setting:`task_compression` setting.link (~@Signature): A single, or a list of tasks signaturesto apply if the task returns successfully.link_error (~@Signature): A single, or a list of task signaturesto apply if an error occurs while executing the task.producer (kombu.Producer): custom producer to use when publishingthe task.add_to_parent (bool): If set to True (default) and the taskis applied while executing another task, then the resultwill be appended to the parent tasks ``request.children``attribute.  Trailing can also be disabled by default using the:attr:`trail` attributepublisher (kombu.Producer): Deprecated alias to ``producer``.headers (Dict): Message headers to be included in the message.Returns:~@AsyncResult: Promise of future evaluation.Raises:TypeError: If not enough arguments are passed, or too manyarguments are passed.  Note that signature checks maybe disabled by specifying ``@task(typing=False)``.kombu.exceptions.OperationalError: If a connection to thetransport cannot be made, or if the connection is lost.Note:Also supports all keyword arguments supported by:meth:`kombu.Producer.publish`."""if self.typing:try:check_arguments = self.__header__                   # 获取参数except AttributeError:  # pragma: no coverpasselse:check_arguments(*(args or ()), **(kwargs or {}))app = self._get_app()                                       # 获取当前app                         if app.conf.task_always_eager:                              # 如果该配置为truereturn self.apply(args, kwargs, task_id=task_id or uuid(), link=link, link_error=link_error, **options)  # 本地执行该任务并返回结果# add 'self' if this is a "task_method".if self.__self__ is not None:args = args if isinstance(args, tuple) else tuple(args or ())args = (self.__self__,) + argsshadow = shadow or self.shadow_name(args, kwargs, options)preopts = self._get_exec_options()                                  # 获取队列等信息options = dict(preopts, **options) if options else preopts          # 设置成字典类型return app.send_task(self.name, args, kwargs, task_id=task_id, producer=producer,link=link, link_error=link_error, result_cls=self.AsyncResult,shadow=shadow, task_type=self,**options)                                                                   # 调用app发送send_task

该方法比较复杂,主要是进行了组装待发送任务的任务的参数,如connection,queue,exchange,routing_key等,如果是配置了本地直接执行则本地执行直接返回结果,否则调用app实例的send_task发送任务。

def send_task(self, name, args=None, kwargs=None, countdown=None,eta=None, task_id=None, producer=None, connection=None,router=None, result_cls=None, expires=None,publisher=None, link=None, link_error=None,add_to_parent=True, group_id=None, retries=0, chord=None,reply_to=None, time_limit=None, soft_time_limit=None,root_id=None, parent_id=None, route_name=None,shadow=None, chain=None, task_type=None, **options):"""Send task by name.Supports the same arguments as :meth:`@-Task.apply_async`.Arguments:name (str): Name of task to call (e.g., `"tasks.add"`).result_cls (~@AsyncResult): Specify custom result class."""parent = have_parent = Noneamqp = self.amqp                                                    # 获取amqp实例task_id = task_id or uuid()                                         # 设置任务id,如果没有传入则生成任务idproducer = producer or publisher  # XXX compat                      # 生成这router = router or amqp.router                                      # 路由值,如果没有则使用amqp的routerconf = self.conf                                                    # 获取配置信息if conf.task_always_eager:  # pragma: no cover                      # 如果配置了本地执行则打印信息warnings.warn(AlwaysEagerIgnored('task_always_eager has no effect on send_task',), stacklevel=2)options = router.route(options, route_name or name, args, kwargs, task_type)           # 生成route信息if not root_id or not parent_id:parent = self.current_worker_task if parent:if not root_id:root_id = parent.request.root_id or parent.request.idif not parent_id:parent_id = parent.request.idmessage = amqp.create_task_message(task_id, name, args, kwargs, countdown, eta, group_id,expires, retries, chord,maybe_list(link), maybe_list(link_error),reply_to or self.oid, time_limit, soft_time_limit,self.conf.task_send_sent_event,root_id, parent_id, shadow, chain,)                                                                   # 生成任务信息if connection:producer = amqp.Producer(connection)                            # 如果有连接则生成生产者with self.producer_or_acquire(producer) as P:                       with P.connection._reraise_as_library_errors():self.backend.on_task_call(P, task_id)amqp.send_task_message(P, name, message, **options)         # 发送任务消息 result = (result_cls or self.AsyncResult)(task_id)                  # 生成异步任务实例if add_to_parent:if not have_parent:parent, have_parent = self.current_worker_task, Trueif parent:parent.add_trail(result)return result                                                       # 返回结果

至此一个任务就发送出去,等待着消费者消费掉任务。

worker消费task的概述

在分析celery的worker的启动过程中,最后开启了loop等待任务来消费,启动定义的回调函数就是on_task_received,

    def on_task_received(message):# payload will only be set for v1 protocol, since v2# will defer deserializing the message body to the pool.payload = Nonetry:type_ = message.headers['task']                # protocol v2        # 获取任务except TypeError:return on_unknown_message(None, message)                            # 如果解析失败except KeyError:try:payload = message.decode()                                      # 再次解析消息except Exception as exc:  # pylint: disable=broad-exceptreturn self.on_decode_error(message, exc)try:type_, payload = payload['task'], payload  # protocol v1        # 利用协议解析任务except (TypeError, KeyError):return on_unknown_message(payload, message)try:strategy = strategies[type_]                                        # 获取type_的对应stratepyexcept KeyError as exc:return on_unknown_task(None, message, exc)else:try:strategy(message, payload,promise(call_soon, (message.ack_log_error,)),promise(call_soon, (message.reject_log_error,)),callbacks,)                                                               # 处理获取的信息内容except InvalidTaskError as exc:return on_invalid_task(payload, message, exc)

至此,从Django应用客户端发送的消息就到达了启动的worker的进程并被消费掉。
大概的消费流程如下,
此时的strategies就是在consumer的task实例在启动start时,调用的update_strategies方法,

  def update_strategies(self):loader = self.app.loader                                                # app的加载器for name, task in items(self.app.tasks):                                # 遍历所有的任务self.strategies[name] = task.start_strategy(self.app, self)         # 将task的name设为key 将task调用的返回值作为keytask.__trace__ = build_tracer(name, task, loader, self.hostname,app=self.app)                         # 处理相关执行结果的函数

此时我们继续查看task.start_strategy函数,

def start_strategy(self, app, consumer, **kwargs):return instantiate(self.Strategy, self, app, consumer, **kwargs)    # 生成task实例

此时self.Strategy的默认值是celery.worker.strategy:default,

def default(task, app, consumer,info=logger.info, error=logger.error, task_reserved=task_reserved,to_system_tz=timezone.to_system, bytes=bytes, buffer_t=buffer_t,proto1_to_proto2=proto1_to_proto2):"""Default task execution strategy.Note:Strategies are here as an optimization, so sadlyit's not very easy to override."""hostname = consumer.hostname                                                    # 设置相关的消费者信息connection_errors = consumer.connection_errors                                  # 设置错误值_does_info = logger.isEnabledFor(logging.INFO)# task event related# (optimized to avoid calling request.send_event)eventer = consumer.event_dispatcher                                             events = eventer and eventer.enabledsend_event = eventer.sendtask_sends_events = events and task.send_eventscall_at = consumer.timer.call_atapply_eta_task = consumer.apply_eta_taskrate_limits_enabled = not consumer.disable_rate_limitsget_bucket = consumer.task_buckets.__getitem__handle = consumer.on_task_requestlimit_task = consumer._limit_taskbody_can_be_buffer = consumer.pool.body_can_be_bufferReq = create_request_cls(Request, task, consumer.pool, hostname, eventer)       # 返回一个请求类revoked_tasks = consumer.controller.state.revokeddef task_message_handler(message, body, ack, reject, callbacks,to_timestamp=to_timestamp):if body is None:body, headers, decoded, utc = (message.body, message.headers, False, True,)if not body_can_be_buffer:body = bytes(body) if isinstance(body, buffer_t) else bodyelse:body, headers, decoded, utc = proto1_to_proto2(message, body)           # 解析接受的数据req = Req(message,on_ack=ack, on_reject=reject, app=app, hostname=hostname,eventer=eventer, task=task, connection_errors=connection_errors,body=body, headers=headers, decoded=decoded, utc=utc,)                                                                           # 实例化请求if _does_info:info('Received task: %s', req)if (req.expires or req.id in revoked_tasks) and req.revoked():returnif task_sends_events:send_event('task-received',uuid=req.id, name=req.name,args=req.argsrepr, kwargs=req.kwargsrepr,root_id=req.root_id, parent_id=req.parent_id,retries=req.request_dict.get('retries', 0),eta=req.eta and req.eta.isoformat(),expires=req.expires and req.expires.isoformat(),)                                                                       # 如果需要发送接受请求则发送if req.eta:                                                                 # 时间相关处理try:if req.utc:eta = to_timestamp(to_system_tz(req.eta))else:eta = to_timestamp(req.eta, timezone.local)except (OverflowError, ValueError) as exc:error("Couldn't convert ETA %r to timestamp: %r. Task: %r",req.eta, exc, req.info(safe=True), exc_info=True)req.reject(requeue=False)else:consumer.qos.increment_eventually()call_at(eta, apply_eta_task, (req,), priority=6)else:if rate_limits_enabled:                                                 # 速率限制bucket = get_bucket(task.name)if bucket:return limit_task(req, bucket, 1)task_reserved(req)                                                      # if callbacks:[callback(req) for callback in callbacks] handle(req)                                                             # 处理接受的请求return task_message_handler

此时处理的handler就是在consumer初始化的时候传入的w.process_task,

def _process_task(self, req):"""Process task by sending it to the pool of workers."""try:req.execute_using_pool(self.pool)except TaskRevokedError:try:self._quick_release()   # Issue 877except AttributeError:pass

接着就会调用,req.execute_using_pool来执行该任务,该request位于create_request_cls中的Request类的方法,

class Request(base):def execute_using_pool(self, pool, **kwargs):task_id = self.id                                                       # 获取任务idif (self.expires or task_id in revoked_tasks) and self.revoked():       # 检查是否过期或者是否已经执行过raise TaskRevokedError(task_id)time_limit, soft_time_limit = self.time_limits                          # 获取时间result = apply_async(                                                   # 执行对应的func并返回结果trace,args=(self.type, task_id, self.request_dict, self.body,self.content_type, self.content_encoding),accept_callback=self.on_accepted,timeout_callback=self.on_timeout,callback=self.on_success,error_callback=self.on_failure,soft_timeout=soft_time_limit or default_soft_time_limit,timeout=time_limit or default_time_limit,correlation_id=task_id,)# cannot create weakref to None# pylint: disable=attribute-defined-outside-initself._apply_result = maybe(ref, result)return result

此时调用的apply_async其实就是pool.apply_async的方法,传入的执行方法就是trace_task_ret,

def trace_task(task, uuid, args, kwargs, request={}, **opts):"""Trace task execution."""try:if task.__trace__ is None:task.__trace__ = build_tracer(task.name, task, **opts)return task.__trace__(uuid, args, kwargs, request)                  # 调用在strategy更新时写入的方法except Exception as exc:return trace_ok_t(report_internal_error(task, exc), None, 0.0, None)def _trace_task_ret(name, uuid, request, body, content_type,content_encoding, loads=loads_message, app=None,**extra_request):app = app or current_app._get_current_object()                          # 获取appembed = Noneif content_type:accept = prepare_accept_content(app.conf.accept_content)args, kwargs, embed = loads(body, content_type, content_encoding, accept=accept,)else:args, kwargs, embed = bodyhostname = gethostname()request.update({'args': args, 'kwargs': kwargs,'hostname': hostname, 'is_eager': False,}, **embed or {})R, I, T, Rstr = trace_task(app.tasks[name],uuid, args, kwargs, request, app=app)        # 调用trace_task执行taskreturn (1, R, T) if I else (0, Rstr, T)
trace_task_ret = _trace_task_ret

在update_stragegy时传入的方法是,

task.__trace__ = build_tracer(name, task, loader, self.hostname,app=self.app)

build_tracer函数的部分解析是,

def build_tracer(name, task, loader=None, hostname=None, store_errors=True,Info=TraceInfo, eager=False, propagate=False, app=None,monotonic=monotonic, truncate=truncate,trace_ok_t=trace_ok_t, IGNORE_STATES=IGNORE_STATES):fun = task if task_has_custom(task, '__call__') else task.run   # 获取task对应的run函数...def trace_task(uuid, args, kwargs, request=None):# R      - is the possibly prepared return value.# I      - is the Info object.# T      - runtime# Rstr   - textual representation of return value# retval - is the always unmodified return value.# state  - is the resulting task state.# This function is very long because we've unrolled all the calls# for performance reasons, and because the function is so long# we want the main variables (I, and R) to stand out visually from the# the rest of the variables, so breaking PEP8 is worth it ;)R = I = T = Rstr = retval = state = Nonetask_request = Nonetime_start = monotonic()...# -*- TRACE -*-try:R = retval = fun(*args, **kwargs) # 执行对应的函数state = SUCCESSexcept Reject as exc:...return trace_task

此时调用的fun函数就是task本来应该执行的函数,此时就执行了对应task并获得了函数执行的返回结果。
至此,一个简单的消息的发送和消费的过程就完成了。

本文总结

主要是讲述了一个task任务从客户端的发送过程,然后服务端获得任务后并消费掉该任务,从而完成任务的消费,虽然本文的分析略显粗略,只是大致描述了任务的发送和消费,其中很多细节没有一一分析,大家如有兴趣可自行分析。

celery源码分析-Task的初始化与发送任务相关推荐

  1. celery源码分析-worker初始化分析(下)

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的worker启动 在上文中分析到了Hub类的初始化,接下来继续分析Pool类的 ...

  2. celery源码分析-wroker初始化分析(上)

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery与Django的配合使用 首先,在安装有django的环境中创建一个django ...

  3. celery源码分析-定时任务

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的定时任务与Django配置 celery也可以执行定时任务来执行相关操作,ce ...

  4. celery源码分析:multi命令分析

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery简介 celery是一款异步任务框架,基于AMQP协议的任务调度框架.使用的场景 ...

  5. linux源码分析之cpu初始化 kernel/head.s,linux源码分析之cpu初始化

    linux源码分析之cpu初始化 kernel/head.s 收藏 来自:http://blog.csdn.net/BoySKung/archive/2008/12/09/3486026.aspx l ...

  6. Spring IOC 容器源码分析 - 余下的初始化工作

    1. 简介 本篇文章是"Spring IOC 容器源码分析"系列文章的最后一篇文章,本篇文章所分析的对象是 initializeBean 方法,该方法用于对已完成属性填充的 bea ...

  7. nginx源码分析之网络初始化

    nginx作为一个高性能的HTTP服务器,网络的处理是其核心,了解网络的初始化有助于加深对nginx网络处理的了解,本文主要通过nginx的源代码来分析其网络初始化. 从配置文件中读取初始化信息 与网 ...

  8. nginx源码分析之模块初始化

    在nginx启动过程中,模块的初始化是整个启动过程中的重要部分,而且了解了模块初始化的过程对应后面具体分析各个模块会有事半功倍的效果.在我看来,分析源码来了解模块的初始化是最直接不过的了,所以下面主要 ...

  9. nginx源码分析—模块及其初始化

    Content 0. 序 1. nginx有哪些模块? 2. nginx如何描述这些模块? 2.1 模块数据结构 2.1.1 ngx_module_t结构 2.1.2 ngx_command_t结构 ...

最新文章

  1. 再来一次的C语言贪吃蛇小游戏(三)
  2. python多久可以精通_学Python需要多久能学会?精通Python需要多长时间?
  3. QCon演讲|闲鱼从零到千万DAU的应用架构演进
  4. 《Excel VBA实战技巧精粹》终于登场了
  5. 基于Java计算器 科学计算器与标准计算器相互转化
  6. 高数_第6章无穷级数__正项级数的性质_比值_比较_根值_极限审敛法
  7. qt中采用G.729A进行网络语音通话实验程序
  8. Win7下安装DirectShow
  9. electron-vue配合electron-release-server自动更新(完整版-详细版)
  10. [Vue.js] 使用 babel-polyfill 解决IE浏览器 正常使用
  11. CSDN技术主题月:实战解读移动信息安全技术
  12. Allow Arbitrary Loads in Web Content与Allow Arbitrary Loads配置
  13. 手把手带你学习微信小程序 —— 十(icon 标签【微信默认标签】)
  14. SQL数据库常用语句大全
  15. 使用Java Swing实现7个经典应用诠释算法精髓
  16. ann神经网络matlab,ann神经网络(深入浅出图神经网络 pdf)
  17. ThinkPad 64位操作系统使用VMware时遇到Vt未打开的错误报警
  18. a450j装深度linux,i7-4700HQ超强动力!华硕A450J笔记本评测
  19. 什么是图灵完备语言?
  20. 【转】RNN的神奇之处(The Unreasonable Effectiveness of Recurrent Neural Networks)

热门文章

  1. 全面升级!星环科技基础软件再升级,赋能数字中国建设
  2. 中国顶尖的技术社区们在一个群里,会聊什么…
  3. 强烈推荐一款Python可视化神器!
  4. 国行版HomePod售价2799元,本周五发售
  5. 「数据科学家」必备的10种机器学习算法
  6. SpringBoot 定时任务动态管理通用解决方案
  7. 为什么老外不愿意用 MyBatis?
  8. 面试官给我挖坑:URI中的 “//” 有什么用?
  9. 数据驱动的算法工程落地!
  10. 百度发布全功能AI平台!