在看这篇文章之前,我们先要了解过消费者并发消费的底层实现,因为消息消费重试都是在消费完成之后才会去判断是否需要对该消息进行重消费。那么什么时候需要对消息进行重新消费呢,比如说消费失败的时候,此时RockqtMQ不会让这个消息白白浪费掉,而是会让消费者能够有机会重新拿到这个消息对这个消息重新消费,而消息重试的原理其实就是延迟消息的一种运用,在一个消息消费失败的时候,消费者底层会把这个消息重新发送到broker端,并且这次的发送试一次延时消息的发送,然后当延时时间结束之后,消费者就会能够重新拉取到这个消息并对此进行重新消费,这就是RocketMQ实现消息重试的底层原理,具体的细节是如何实现的,我们下面直接去看其源码实现。

1.找出需要重试的消息

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult

注意:对于消息重试机制来说,该机制只适用于并发消费的时候

public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest
) {// 默认是Integer.MAX_VALUE,该值需要配合设置批量消费去使用int ackIndex = context.getAckIndex();if (consumeRequest.getMsgs().isEmpty())return;switch (status) {// 消费成功case CONSUME_SUCCESS:// 对于批量消费,如果用户设置的ackIndex大于批量消费消息数,那么ackIndex = 消费数 - 1if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;// 统计消费成功的消息数量this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);// 统计消费失败的消息数量this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;// 消费失败,ackIndex = -1case RECONSUME_LATER:ackIndex = -1;this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());break;default:break;}switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;case CLUSTERING:List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());// 这里有两种情况:// 1.在回调方法中返回了RECONSUME_LATER,表示此次消费失败,那么无论是单条消费还是批量消费都会对所有的消息进行回退// 2.在回调方法中返回了CONSUME_SUCCESS,表示此次消费成功,那么对于单条消费来说是不会对这条消息进行回退的,//   但是如果是批量消费,并且指定了ackIndex,就算是返回了CONSUME_SUCCESS,也会对索引的消息进行回退// 举个例子,如果用户设置了批量消费 3 条数据,回调方法的返回值是CONSUME_SUCCESS,ackIndex = 0// 第一次遍历 i = 0 + 1 = 1, 1 < 3?,条件成立,所以索引为0的消息就需要进行消息回退,i++// 第二次遍历 i = 1 + 1 = 2, 2 < 3?,条件成立,所以索引为1的消息就需要进行消息回退,i++// 第三次遍历 i = 2 + 1 = 3, 3 < 3?,条件不成立,所以索引为2的消息不需要进行消息回退,跳出循环// 也就是说对于批量消费,ackIndex的意思就是该索引本身及(从0开始)之后的消息都需要进行消息回退for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);// 向broker发起消息回退请求// 什么是消息回退?当消费失败之后,消费者会重新向broker发送一个延迟消息,当该消息到达到期时间的时候就又会被消费者所重新消费,到达了消费重试的目的boolean result = this.sendMessageBack(msg, context);// 请求失败,重试if (!result) {// 消息重消费次数 + 1msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);// 消息加入到响应失败集合msgBackFailed.add(msg);}}// 把响应失败的消息从consumeRequest中移除if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);// 把响应失败的消息延迟5s后重新放到消费服务线程中进行再次消费this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}// 把被真正消费成功的消息从msgTreeMap中移除,怎样才算真正的消费成功? 这里消费成功的 或者消费失败但是消息回退成功都算是真正的消费成功// 如果移除完msg之后msgTreeMap已经没有数据了,那么返回offset就等于当前ProcessQueue最大偏移量 + 1, 反之返回的offset就等于当前ProcessQueue最小偏移量long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {// 更新本地内存中该mq的已消费偏移量this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}
}

当我们在消息监听回调中返回了CONSUME_SUCCESS,那么ackIndex就等于待消费的消息长度 - 1,此时for循环就不会进去了,也就是不会触发消息的重试,那么是不是就意味着返回CONSUME_SUCCESS就不会触发消息的重试了?答案并不一定,如果我们是批量消费的情况下,并且在消费上下文对象中手动指定了ackIndex,此时在这批消息中索引大于ackIndex的消息就会被重试,详细过程看上面代码的注释。而当我们在消息监听回调中返回了RECONSUME_LATER之后,不管我们在消费上下文对象中指定ackIndex的值是多少,都会把这批消息进行重试

2.开始消息重试

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#sendMessageBack

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {// 发送回退消息的延迟级别(因为发送回退消息也是发送延迟消息的一种)int delayLevel = context.getDelayLevelWhenNextConsume();// Wrap topic with namespace before sending back message.msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));try {// 向broker返回消费ackthis.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);}return false;
}
/*** 向broker发送消息回退请求* @param msg   回退的msg* @param delayLevel    延迟级别* @param brokerName    broker组名* @throws RemotingException* @throws MQBrokerException* @throws InterruptedException* @throws MQClientException*/
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {try {// 给broker发送一个消息重试请求String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());} catch (Exception e) {// 发送消息重试请求失败log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId = MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());// 重新给broker发送一次消息this.mQClientFactory.getDefaultMQProducer().send(newMsg);} finally {msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}
}

这里是给broker发送一个消息重试的请求,如果这个请求发送失败,那么消费者就主动向broker再次发送一次消息,那么我们看一下broker是如何处理这个消息重试请求的

org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncConsumerSendMsgBack

private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final ConsumerSendMsgBackRequestHeader requestHeader =(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {ConsumeMessageContext context = buildConsumeMessageContext(namespace, requestHeader, request);this.executeConsumeMessageHookAfter(context);}// 根据消费者组名找到对应的SubscriptionGroupConfig对象SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());if (null == subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));return CompletableFuture.completedFuture(response);}// 判断当前broker是否有写权限if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");return CompletableFuture.completedFuture(response);}// 如果重试队列数 <= 0, 那么直接返回SUCCESSif (subscriptionGroupConfig.getRetryQueueNums() <= 0) {response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return CompletableFuture.completedFuture(response);}// 获取消息回退所投递的topic名称 = %RETRY% + groupNameString newTopic = MixAll.getRetryTopic(requestHeader.getGroup());// 得到的queueIdInt通常都等于0,也就是说对于每一个消费者组重试队列只有一个int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();int topicSysFlag = 0;if (requestHeader.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}// 为该消费者组创建消息回退的主题配置TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);// 条件成立:创建重试主题失败,直接响应SYSTEM_ERROR给客户端if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return CompletableFuture.completedFuture(response);}// 条件成立:重试主题没有写权限,直接响应NO_PERMISSION给客户端if (!PermName.isWriteable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));return CompletableFuture.completedFuture(response);}// 根据commitlog物理偏移量从commitlog文件中返回对应的消息对象MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());// 如果找不到对应的消息,直接返回SYSTEM_ERRORif (null == msgExt) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("look message by offset failed, " + requestHeader.getOffset());return CompletableFuture.completedFuture(response);}final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);// 条件成立:表示消息第一次进行重试if (null == retryTopic) {// 把消息的主题设置到属性RETRY_TOPIC中MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());}msgExt.setWaitStoreMsgOK(false);// 得到延迟级别int delayLevel = requestHeader.getDelayLevel();// 获取最大消息重试次数int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();// 从3.4.9版本开始取客户端传过来的最大消息重试次数if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();}// 条件成立:此时该消息回退次数已经到达了客户端指定的重试次数,或者延迟级别 < 0, 消息就会进入死信队列if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) {// 得到死信队列的主题名称 = %DLQ% + groupNamenewTopic = MixAll.getDLQTopic(requestHeader.getGroup());// 通常queueIdInt都等于0,也就是说对于每一个消费者组来说死信队列的数量只有一个queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;// 创建死信队列的主题配置topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE, 0);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return CompletableFuture.completedFuture(response);}} else {// 条件成立:如果用户的业务逻辑中没有对ConsumeConcurrentlyContext对象的delayLevelWhenNextConsume属性进行设置的话,delayLevel == 0if (0 == delayLevel) {// 消息第一次进来, delayLevel = 3 + 1 = 4delayLevel = 3 + msgExt.getReconsumeTimes();}// 设置回退消息的延迟级别msgExt.setDelayTimeLevel(delayLevel);}// 拷贝一个新的msg对象MessageExtBrokerInner msgInner = new MessageExtBrokerInner();// 设置死信主题 / 重试主题msgInner.setTopic(newTopic);msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));msgInner.setQueueId(queueIdInt);msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());// 重试次数 + 1msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);// 获取ORIGIN_MESSAGE_ID属性String originMsgId = MessageAccessor.getOriginMessageId(msgExt);// 设置ORIGIN_MESSAGE_ID属性,当第一次回退的时候UtilAll.isBlank(originMsgId) == true,该属性就设置为消息的原始IdMessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);// 把回退消息写入到commitlogCompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);return putMessageResult.thenApply((r) -> {if (r != null) {switch (r.getPutMessageStatus()) {case PUT_OK:String backTopic = msgExt.getTopic();String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (correctTopic != null) {backTopic = correctTopic;}this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;default:break;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(r.getPutMessageStatus().name());return response;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("putMessageResult is null");return response;});
}

1.根据要重试的消息的commitlog物理偏移量从commitlog文件中找到对应的消息

2.拷贝一个新的消息,并且该新的消息重试次数+1,主题是%RETRY% + 消费者组名,如果此时该消息的重试次数超过最大重试次数或者延迟级别小于0,那么就把这个新的消息写入到死信队列中

3.设置延迟级别,在原来的延迟级别的基础上 + 3

4.把这个新消息再一次写入到commitlog中

由于设置了延迟级别,所以会走延迟消息的发送逻辑,对于延迟消息的原理这里就要不讲了,所以总结也就是说当消费端认为这个消息消费失败的时候就会向broker发送一个消息重试请求,broker收到这个请求之后就会从commitlog中找到这个消息,然后拷贝一个新的消息,并且重试次数 +1,主题重置为%RETRY% + 消费者组名,然后把这个新消息再一次写入到commitlog中,而此时这个新消息会进入到延迟主题SCHEDULE_TOPIC_XXXX中,当到达了延迟时间的时候该消息就会再一次被发送到 %RETRY% + 消费者组名 这个主题,那么消费者只要把 %RETRY% + 消费者组名 这个主题进行订阅就可以重新获取到这个消息了,那么消费者是在什么时候订阅这个主题的呢?

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#copySubscription

......switch (this.defaultMQPushConsumer.getMessageModel()) {// 广播case BROADCASTING:break;// 集群case CLUSTERING:// 给当前消费者订阅retry topic,为什么需要订阅一个retry topic? 这个topic是为了给消费者消费失败,然后对消息回退的时候进行重消费的作用// 当消费者消费失败的时候,就会进行消息回退,回退的消息所在的topic就是这个retry topicfinal String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),retryTopic, SubscriptionData.SUB_ALL);this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;
}

消费者刚启动的时候,会把订阅 %RETRY% + 消费者组名 这个主题,之后当这个主题有消息的时候就可以拉取到消息进行重新消费了

RocketMQ消费端消息回退(消费重试)机制源码解析相关推荐

  1. JVM-白话聊一聊JVM类加载和双亲委派机制源码解析

    文章目录 Java 执行代码的大致流程 类加载loadClass的步骤 类加载器和双亲委派机制 sun.misc.Launcher源码解析 Launcher实例化 Launcher 构造函数 双亲委派 ...

  2. RocketMQ:消费端的消息消息队列负载均衡与重新发布机制源码解析

    文章目录 前言 流程解析 总结 前言 在上一篇博客中我们了解到,PullMessageService线程主要是负责从pullRequestQueue中获得拉取消息请求并进行请求处理的. PullMes ...

  3. Android Handler消息机制源码解析

    好记性不如烂笔头,今天来分析一下Handler的源码实现 Handler机制是Android系统的基础,是多线程之间切换的基础.下面我们分析一下Handler的源码实现. Handler消息机制有4个 ...

  4. MapReduce的分片机制源码解析

    目录 一.分⽚的概念 二.分片大小的选择 三.源码解析 1)FileSplit源码解析 2)FileInputFormat源码解析 3)TextInputFormat源码解析 4) LineRecor ...

  5. RocketMQ:消息ACK机制源码解析

    消息消费进度 概述 消费者消费消息过程中,为了避免消息的重复消费,应将消息消费进度保存起来,当其他消费者再对消息进行消费时,读取已消费的消息偏移量,对之后的消息进行消费即可. 消息模式分为两种: 集群 ...

  6. Android View系列(二):事件分发机制源码解析

    概述 在介绍点击事件规则之前,我们需要知道我们分析的是MotionEvent,即点击事件,所谓的事件分发就是对MotionEvent事件的分发过程,即当一个MotionEvent生成以后,系统需要把这 ...

  7. Android qq消息气泡实现效果,BezierDemo源码解析-实现qq消息气泡拖拽消失的效果

    这篇文章中我们比较了DraggableFlagView和BezierDemo两个项目的区别,提到将对其中一个做源码分析,那么我们就来分析BezierDemo的源码吧,因为这个项目的源码最简单,可以更直 ...

  8. 聊聊Dubbo - Dubbo可扩展机制源码解析

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: 在Dubbo可扩展机制实战中,我们了解了Dubbo扩展机制的一些概念,初探了Dubbo中LoadBalance的实现, ...

  9. Kubernetes的Device Plugin机制源码解析

    简介: Kubernetes 1.8 引入的Device Plugin机制,通过扩展的方式实现支持GPU.FPGA.高性能 NIC.InfiniBand等各种设备的集成.而Device Manager ...

  10. Guava Futures异步回调机制源码解析

    本文是在学习中的总结,欢迎转载但请注明出处:http://blog.csdn.net/pistolove/article/details/51758194 1.前言 在前两篇文章中简单阐述了Java ...

最新文章

  1. C语言怎么实现单词下落,如何用c语言实现单词统计
  2. CCHP分布式能源技术在数据中心IDC的应用
  3. 如何根据sessionID获取session解决方案
  4. 移动并重命名2000个文件,Python,3秒
  5. [Python图像处理] 十九.图像分割之基于K-Means聚类的区域分割
  6. Windows中非常实用的命令
  7. Redux Vuex
  8. JAVA中的设计模式三(策略模式)
  9. 洛谷P3845-球赛【离散化,贪心】
  10. 使用 webstorm 写 typescript 的一些小技巧
  11. 如何使用alt键+数字键盘上的数字键打出特殊符号
  12. 20164319 刘蕴哲 Exp1 PC平台逆向破解
  13. rootkit的检测
  14. 财险产保险公司应用系统各子系统简介
  15. C语言printf输出格式总结
  16. 没有发生GC也进入了安全点?这段关于安全点的JVM源码有点意思!
  17. 百度地图如何去除 百度地图的logo
  18. 51单片机之——串口通信(含实现部分)
  19. 关于 nodejs-websocket 的 wss 设置
  20. 这一篇TCP总结,请务必收下!

热门文章

  1. 学习的四重境界,给上初中侄女,如何学习,如何定义社会人才
  2. build, version详解iOS App SwiftUI
  3. 470.用Rand7()实现Rand10()
  4. 329.矩阵中的最长递增路径
  5. 随机梯度下降法(SGD)
  6. 中调用view_在 View 上使用挂起函数
  7. Hadoop中的一些基本操作
  8. 机器学习相关博客收藏(KL 散度、信息熵、谱聚类、EM、Isolation Kernel、iForest、元学习、小样本学习、课程学习)
  9. 【2019牛客暑期多校训练营(第七场)E】Find the median【权值线段树】
  10. 频域卷积定理的证明 乘积的傅里叶变换等于分别做傅里叶变换的卷积乘1/2pi