Kafka Consumer接口

http://www.cnblogs.com/fxjwind/p/3794255.html

对于kafka的consumer接口,提供两种版本,

high-level

一种high-level版本,比较简单不用关心offset, 会自动的读zookeeper中该Consumer group的last offset
参考,https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

不过要注意一些注意事项,对于多个partition和多个consumer
1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
    最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
5. High-level接口中获取不到数据的时候是会block的

简单版,

简单的坑,如果测试流程是,先produce一些数据,然后再用consumer读的话,记得加上第一句设置
因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前produce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset

        Properties props = new Properties();props.put("auto.offset.reset", "smallest"); //必须要加,如果要读旧数据props.put("zookeeper.connect", "localhost:2181");props.put("group.id", "pv");props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");ConsumerConfig conf = new ConsumerConfig(props);ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);String topic = "page_visits";Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);KafkaStream<byte[], byte[]> stream = streams.get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator();while (it.hasNext()){System.out.println("message: " + new String(it.next().message()));}if (consumer != null) consumer.shutdown();   //其实执行不到,因为上面的hasNext会block

在用high-level的consumer时,两个给力的工具,

1. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv

可以看到当前group offset的状况,比如这里看pv的状况,3个partition

Group           Topic                          Pid Offset          logSize         Lag             Owner
pv              page_visits                    0   21              21              0               none
pv              page_visits                    1   19              19              0               none
pv              page_visits                    2   20              20              0               none

关键就是offset,logSize和Lag
这里以前读完了,所以offset=logSize,并且Lag=0

2. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties  page_visits

3个参数,
[earliest | latest],表示将offset置到哪里
consumer.properties ,这里是配置文件的路径
topic,topic名,这里是page_visits

我们对上面的pv group执行完这个操作后,再去check group offset状况,结果如下,

Group           Topic                          Pid Offset          logSize         Lag             Owner
pv              page_visits                    0   0               21              21              none
pv              page_visits                    1   0               19              19              none
pv              page_visits                    2   0               20              20              none

可以看到offset已经被清0,Lag=logSize

底下给出原文中多线程consumer的完整代码

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ConsumerGroupExample {private final ConsumerConnector consumer;private final String topic;private  ExecutorService executor;public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 创建Connector,注意下面对conf的配置createConsumerConfig(a_zookeeper, a_groupId));this.topic = a_topic;}public void shutdown() {if (consumer != null) consumer.shutdown();if (executor != null) executor.shutdown();}public void run(int a_numThreads) { // 创建并发的consumersMap<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); // 创建StreamsList<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // 每个线程对应于一个KafkaStream// now launch all the threads//
        executor = Executors.newFixedThreadPool(a_numThreads);// now create an object to consume the messages//
        int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new ConsumerTest(stream, threadNumber)); // 启动consumer threadthreadNumber++;}}private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);props.put("group.id", a_groupId);props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);}public static void main(String[] args) {String zooKeeper = args[0];String groupId = args[1];String topic = args[2];int threads = Integer.parseInt(args[3]);ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);example.run(threads);try {Thread.sleep(10000);} catch (InterruptedException ie) {}example.shutdown();}
}

SimpleConsumer

另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口

参考,https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

什么时候用这个接口?

  1. Read a message multiple times
  2. Consume only a subset of the partitions in a topic in a process
  3. Manage transactions to make sure a message is processed once and only once

当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦
所以不是一定要用,最好别用

  1. You must keep track of the offsets in your application to know where you left off consuming.
  2. You must figure out which Broker is the lead Broker for a topic and partition
  3. You must handle Broker leader changes

使用SimpleConsumer的步骤:

  1. Find an active Broker and find out which Broker is the leader for your topic and partition
  2. Determine who the replica Brokers are for your topic and partition
  3. Build the request defining what data you are interested in
  4. Fetch the data
  5. Identify and recover from leader changes

首先,你必须知道读哪个topic的哪个partition
然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
再者,自己去写request并fetch数据
最终,还要注意需要识别和处理broker leader的改变

逐步来看,

Finding the Lead Broker for a Topic and Partition

思路就是,遍历每个broker,取出该topic的metadata,然后再遍历其中的每个partition metadata,如果找到我们要找的partition就返回
根据返回的PartitionMetadata.leader().host()找到leader broker

private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {PartitionMetadata returnMetaData = null;loop:for (String seed : a_seedBrokers) { //遍历每个broker SimpleConsumer consumer = null;try {//创建Simple Consumer,//class SimpleConsumer(val host: String,val port: Int,val soTimeout: Int//                     ,val bufferSize: Int,val clientId: String)
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");List<String> topics = Collections.singletonList(a_topic);TopicMetadataRequest req = new TopicMetadataRequest(topics); //
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); //发送TopicMetadata Request请求List<TopicMetadata> metaData = resp.topicsMetadata(); //取到Topic的Metadata for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {//遍历每个partition的metadataif (part.partitionId() == a_partition) { //确认是否是我们要找的partition
                            returnMetaData = part;break loop; //找到就返回}}}} catch (Exception e) {System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic+ ", " + a_partition + "] Reason: " + e);} finally {if (consumer != null) consumer.close();}}
        return returnMetaData;}
Finding Starting Offset for Reads

request主要的信息就是Map<TopicAndPartition, PartitionOffsetRequestInfo>

TopicAndPartition就是对topic和partition信息的封装
PartitionOffsetRequestInfo的定义
case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int)
其中参数time,表示where to start reading data,两个取值
kafka.api.OffsetRequest.EarliestTime(),the beginning of the data in the logs
kafka.api.OffsetRequest.LatestTime(),will only stream new messages

不要认为起始的offset一定是0,因为messages会过期,被删除

另外一个参数不清楚什么含义,代码中取的是1

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); //build offset fetch request infokafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);OffsetResponse response = consumer.getOffsetsBefore(request); //取到offsets
 if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );return 0;}long[] offsets = response.offsets(topic, partition); //取到的一组offset
        return offsets[0]; //取第一个开始读}
Reading the Data

首先在FetchRequest上加上Fetch,指明topic,partition,开始的offset,读取的大小
如果producer在写入很大的message时,也许这里指定的1000000是不够的,会返回an empty message set,这时需要增加这个值,直到得到一个非空的message set。

// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only.
// Setting the replicaId incorrectly will cause the brokers to behave incorrectly.
FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000) // 1000000bytes.build();
FetchResponse fetchResponse = consumer.fetch(req);if (fetchResponse.hasError()) {// See Error Handling
}
numErrors = 0;long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();if (currentOffset < readOffset) { // 必要判断,因为对于compressed message,会返回整个block,所以可能包含old的messageSystem.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset(); // 获取下一个readOffsetByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;
}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}
}
Error Handling
if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);if (numErrors > 5) break;if (code == ErrorMapping.OffsetOutOfRangeCode())  { // 处理offset非法的问题,用最新的offset// We asked for an invalid offset. For simple case ask for the last element to resetreadOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); // 更新leader brokercontinue;}

没有特别的逻辑,只是重新调用findLeader获取leader broker
并且防止在切换过程中,取不到leader信息,加上sleep逻辑

private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {for (int i = 0; i < 3; i++) {boolean goToSleep = false;PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);if (metadata == null) {goToSleep = true;} else if (metadata.leader() == null) {goToSleep = true;} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {// first time through if the leader hasn't changed give ZooKeeper a second to recover// second time, assume the broker did recover before failover, or it was a non-Broker issue//
               goToSleep = true;} else {return metadata.leader().host();}if (goToSleep) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}System.out.println("Unable to find new leader after Broker failure. Exiting");throw new Exception("Unable to find new leader after Broker failure. Exiting");}
Full Source Code
package com.test.simple;import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class SimpleExample {public static void main(String args[]) {SimpleExample example = new SimpleExample();long maxReads = Long.parseLong(args[0]);String topic = args[1];int partition = Integer.parseInt(args[2]);List<String> seeds = new ArrayList<String>();seeds.add(args[3]);int port = Integer.parseInt(args[4]);try {example.run(maxReads, topic, partition, seeds, port);} catch (Exception e) {System.out.println("Oops:" + e);e.printStackTrace();}}private List<String> m_replicaBrokers = new ArrayList<String>();public SimpleExample() {m_replicaBrokers = new ArrayList<String>();}public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {// find the meta data about the topic and partition we are interested in//
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);if (metadata == null) {System.out.println("Can't find metadata for Topic and Partition. Exiting");return;}if (metadata.leader() == null) {System.out.println("Can't find Leader for Topic and Partition. Exiting");return;}String leadBroker = metadata.leader().host();String clientName = "Client_" + a_topic + "_" + a_partition;SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);int numErrors = 0;while (a_maxReads > 0) {if (consumer == null) {consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);}FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                    .build();FetchResponse fetchResponse = consumer.fetch(req);if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);if (numErrors > 5) break;if (code == ErrorMapping.OffsetOutOfRangeCode())  {// We asked for an invalid offset. For simple case ask for the last element to resetreadOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);continue;}numErrors = 0;long numRead = 0;for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}if (consumer != null) consumer.close();}public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];}private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {for (int i = 0; i < 3; i++) {boolean goToSleep = false;PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);if (metadata == null) {goToSleep = true;} else if (metadata.leader() == null) {goToSleep = true;} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {// first time through if the leader hasn't changed give ZooKeeper a second to recover// second time, assume the broker did recover before failover, or it was a non-Broker issue//
                goToSleep = true;} else {return metadata.leader().host();}if (goToSleep) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}System.out.println("Unable to find new leader after Broker failure. Exiting");throw new Exception("Unable to find new leader after Broker failure. Exiting");}private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {PartitionMetadata returnMetaData = null;loop:for (String seed : a_seedBrokers) {SimpleConsumer consumer = null;try {consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");List<String> topics = Collections.singletonList(a_topic);TopicMetadataRequest req = new TopicMetadataRequest(topics);kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}} catch (Exception e) {System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic+ ", " + a_partition + "] Reason: " + e);} finally {if (consumer != null) consumer.close();}}if (returnMetaData != null) {m_replicaBrokers.clear();for (kafka.cluster.Broker replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());}}return returnMetaData;}
}

Kafka Consumer的两种接口(高阶低阶)相关推荐

  1. Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式

    Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方 ...

  2. Kafka详解(上)——消息系统分类、Kafka安装、两种启动、基本概念、两种架构、核心配置文件

    1 消息和消息系统 ​ 消息(Message)是指在应用间传送的数据.消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象. 1-1 消息系统是什么 ​ 消息系统负责将数据从一个应用 ...

  3. Spring中IoC两种接口和两种依赖注入方式的比较

    spring是一个开源框架,是为了解决企业应用程序开发的复杂性而创建的,为J2EE应用程序开发提供集成的框架.简单来说,spring是一个轻量级的控制反转IOC和面向切面AOP的容器框架.spring ...

  4. Spark Streaming读取Kafka数据的两种方式

    Kafka在0.8和0.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8与spark-s ...

  5. flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic

    flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic 一.flume基于kafka.topics匹配Kafka Topic ...

  6. Django DRF 两种接口安全机制及其配置

    Django DRF 两种接口安全机制及其配置 接口安全机制,用于设置和管理用户调用接口时的权限问题.此处介绍最常用的两种接口安全机制及其配置. 1 使用之前 先生成接口文档 便于测试 1.1 安装依 ...

  7. 老猿学5G随笔:5G网元功能体NF以及NF之间的两种接口--服务化接口和参考点

    一.5G功能体之间的接口类型 5G不同功能体之间提供了两种接口: 服务化接口:Service-basedinterface,这个是类似微服务化架构的服务注册和服务发现来实现的功能体对外暴露的接口,这种 ...

  8. kafka修改分区数_大数据技术:解析SparkStreaming和Kafka集成的两种方式

    Spark Streaming是基于微批处理的流式计算引擎,通常是利用Spark Core或者Spark Core与Spark Sql一起来处理数据.在企业实时处理架构中,通常将Spark Strea ...

  9. 台式计算机硬盘主要有哪两种接口,台式机硬盘和笔记本硬盘都有哪些区别?

    如果对电脑硬盘有一定了解的用户,肯定知道硬盘有分为SATA硬盘和IDE硬盘,或者分为机械硬盘和固态硬盘,但如果你想对台式机硬盘和笔记本硬盘之间的区别有些了解的话,那就可以详细看下本篇文章给你带来的汇总 ...

最新文章

  1. 广东生态所孙蔚旻团队EST发表尾矿微生态调查两部曲
  2. [译]Effective Kotlin系列之探索高阶函数中inline修饰符(三)
  3. 【CV秋季划】人脸识别经典难题介绍,抗遮挡,跨年龄,异质源,少样本等
  4. [platform]linux platform device/driver(三)--Platform Device和Platform_driver注册过程之代码对比...
  5. VMware使两台windows虚拟机能够互相ping通
  6. Tensorflow的快速安装(张量图例)
  7. oracle中断进程,中断ORACLE数据库关闭进程导致错误案例
  8. JavaScript学习(六十六)—字符串对象常用的属性和方法总结以及数组元素的排序问题
  9. IEEE802.15.4、ZigBee、ZigBee协议栈、Zstack、ZigBee联盟、CC2530、IAR的关系?
  10. 背包问题九讲笔记-01背包问题
  11. 错误代码:88000, 错误信息:without comment privilege hint: [7oJ0533w689] rid: 630432cd-15944cf6-083e04fc
  12. 爱,不留——陈光标裸捐之后
  13. [jzoj5791]【NOIP2008模拟】阶乘 (数学)
  14. printf和println和print区别
  15. mysql错误码 1068_服务启动报错----错误1068 的解决方法
  16. input禁止自动填充
  17. 我的世界海洋java_我的世界Minecraft Java版18w15a发布
  18. spring boot 使用过滤器过滤非法字符
  19. 软件——Jira是什么
  20. android 华为手机拍照,华为手机拍照不行?可能是你模式不对!

热门文章

  1. 文本关键词的提取算法实验
  2. Double Check形式的单例模式
  3. OCO-2卫星数据批量化下载教程
  4. 关于H5中的Canvas API的探索
  5. 产业区块链生态架构图
  6. 手机测试人员的思维过程
  7. html css图标怎么跟文字并排,科技常识:css图标与文字对齐的两种实现方法
  8. Py网络编程及应用(urllib、socket/selectors)
  9. 用python画盒图_[519]matplotlib(四)|Python中用matplotlib绘制盒状图(Boxplots)和小提琴图(Violinplots)...
  10. 如何安装redis和给wordpres加速?