一、场景说明

当我们初始化一个Kafka生产者后(初始化流程可以查看《Kafka源码解析之生产者初始化流程》),通过该生产者将封装好的消息发送出去,示例代码仍然参考example模块下的Producer.java:

public class Producer extends Thread {  public Producer(String topic, Boolean isAsync) {        Properties props = new Properties();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        //初始化KafkaProducer        producer = new KafkaProducer<>(props);        this.topic = topic;        this.isAsync = isAsync;    }  public void run() {        int messageNo = 1;        while (true) {            String messageStr = "Message_" + messageNo;            long startTime = System.currentTimeMillis();            if (isAsync) { // Send asynchronously                //异步发送消息                producer.send(new ProducerRecord<>(topic,                    messageNo,                    messageStr), new DemoCallBack(startTime, messageNo, messageStr));            }             ...            ++messageNo;        }    }}    

发送消息的过程中就必然需要集群的元数据,比如指定的Topic有多少分区,每个分区的Leader副本在哪个节点上等等。那么客户端是如何获取集群元数据的呢?下面通过图示+源码的方式详细分析一下这个流程。

二、获取元数据流程图

这里重点分析主线程和Sender线程的切换,集群元数据的获取是通过Sender线程完成的。

三、过程源码解析

1、KafkaProducer通过send方法最终调用了doSend方法,生产者生产的消息就是通过这个方法发送给客户端的,截取部分代码如下:

private FuturedoSend(ProducerRecord record, Callback callback) {    TopicPartition tp = null;    try {        throwIfProducerClosed();        // first make sure the metadata for the topic is available        ClusterAndWaitTime clusterAndWaitTime;        try {            //TODO 步骤一:同步等待获取元数据(在这个方法中主线程会阻塞,并唤醒sender线程)            clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);        } catch (KafkaException e) {            if (metadata.isClosed())                throw new KafkaException("Producer closed while send in progress", e);            throw e;        }    }    ...}            

doSend方法中通过调用waitOnMetadata来同步等待获取元数据,会阻塞主线程,直到获取到元数据或者超时,该方法代码如下:

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {    //拉取元数据,第一次进来是拉取不到的,metadata只有配置的bootstrap.servers信息    Cluster cluster = metadata.fetch();    if (cluster.invalidTopics().contains(topic))        throw new InvalidTopicException(topic);    //将指定的topic添加到元数据    metadata.add(topic);    //获取指定topic的分区数,第一次进来是没有topic的信息的,所以也获取不到分区数,partitionsCount为null    Integer partitionsCount = cluster.partitionCountForTopic(topic);    if (partitionsCount != null && (partition == null || partition < partitionsCount))        return new ClusterAndWaitTime(cluster, 0);    long begin = time.milliseconds();    long remainingWaitMs = maxWaitMs;    long elapsed;    //循环,直到获取集群元数据,或者超时    do {        if (partition != null) {            log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);        } else {            log.trace("Requesting metadata update for topic {}.", topic);        }        metadata.add(topic);        //获取当前元数据的版本号        int version = metadata.requestUpdate();        //唤醒sender线程        sender.wakeup();        try {            //阻塞等待元数据            //退出等待有两种方式:1)达到等待的时间;2)被其他线程唤醒了            metadata.awaitUpdate(version, remainingWaitMs);        } catch (TimeoutException ex) {            // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs            throw new TimeoutException(                    String.format("Topic %s not present in metadata after %d ms.",                            topic, maxWaitMs));        }        //如果线程被唤醒,说明已经可以获取到元数据了        cluster = metadata.fetch();        //花费的时间        elapsed = time.milliseconds() - begin;        //如果超时,抛异常        if (elapsed >= maxWaitMs) {            throw new TimeoutException(partitionsCount == null ?                    String.format("Topic %s not present in metadata after %d ms.",                            topic, maxWaitMs) :                    String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",                            partition, topic, partitionsCount, maxWaitMs));        }        metadata.maybeThrowExceptionForTopic(topic);        //计算剩余时间        remainingWaitMs = maxWaitMs - elapsed;        //获取分区数        partitionsCount = cluster.partitionCountForTopic(topic);    } while (partitionsCount == null || (partition != null && partition >= partitionsCount));    return new ClusterAndWaitTime(cluster, elapsed);}

该方法中通过一个do...while循环不断尝试获取元数据,我们看几段重要的代码:

//获取当前元数据的版本号int version = metadata.requestUpdate();

a .获取当前元数据的版本号,是一个递增的值,客户端每更新一次元数据,就同时更新一次版本号;

//唤醒sender线程sender.wakeup();

b.唤醒Sender线程,集群的元数据就是通过Sender线程获取到的

//阻塞等待元数据metadata.awaitUpdate(version, remainingWaitMs);

c.阻塞主线程,等待更新元数据,结束等待的条件有两个:

  • 被其它线程唤醒

  • 达到等待时间

awaitUpdate方法的代码如下:

public synchronized void awaitUpdate(final int lastVersion, final long timeoutMs) throws InterruptedException {    long currentTimeMs = time.milliseconds();    //最后期限=当前时间+等待时间    long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : currentTimeMs + timeoutMs;    time.waitObject(this, () -> {        // Throw fatal exceptions, if there are any. Recoverable topic errors will be handled by the caller.        maybeThrowFatalException();        //直到最新的版本号>给定的版本号,方法返回        return updateVersion() > lastVersion || isClosed();    }, deadlineMs);    if (isClosed())        throw new KafkaException("Requested metadata update after close");}

该方法返回的条件是:updateVersion() > lastVersion,即更新后的version大于当前元数据的version。

2、既然唤醒了Sender线程来获取元数据,那就看一下它的run方法,内部调用了runOnce方法,代码如下:

void runOnce() {    //前面关于事务的代码先不看    ...    long currentTimeMs = time.milliseconds();    long pollTimeout = sendProducerData(currentTimeMs);    //TODO 真正执行网络操作的都是NetworkClient这个组件    // 包括发送请求,接收响应,处理响应    // 就是通过这个方法拉取的元数据    client.poll(pollTimeout, currentTimeMs);}

这里client的实现类是NetworkClient,是一个执行网络操作的组件,通过它的poll方法来获取元数据:

@Overridepublic Listpoll(long timeout, long now) {    //    ensureActive();    //List 如果abortedSends不为空,说明已经连接broker并获取了响应,直接处理    //第一次进来abortedSends为空,不走这个分支    if (!abortedSends.isEmpty()) {        // If there are aborted sends because of unsupported version exceptions or disconnects,        // handle them immediately without waiting for Selector#poll.        List responses = new ArrayList<>();        handleAbortedSends(responses);        completeResponses(responses);        return responses;    }    //TODO 步骤一:封装一个拉取元数据的请求    long metadataTimeout = metadataUpdater.maybeUpdate(now);    try {        //TODO 步骤二:发送请求,进行复杂的网络操作,这里用的就是java的NIO        this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));    } catch (IOException e) {        log.error("Unexpected error during I/O", e);    }    // process completed actions    long updatedNow = this.time.milliseconds();    List responses = new ArrayList<>();    //将请求返回的响应放到responses集合    handleCompletedSends(responses, updatedNow);    //TODO 步骤三:处理响应,响应里面就会有我们需要的元数据    handleCompletedReceives(responses, updatedNow);    handleDisconnections(responses, updatedNow);    handleConnections();    handleInitiateApiVersionRequests(updatedNow);    handleTimedOutRequests(responses, updatedNow);    completeResponses(responses);    return responses;}

这个方法主要分三步:

  • 封装一个拉取元数据的请求

  • 向服务端发送请求,获取响应

  • 处理响应,获取响应中的集群元数据信息

看一下具体的代码:

步骤一:

//TODO 步骤一:封装一个拉取元数据的请求long metadataTimeout = metadataUpdater.maybeUpdate(now);

这里metadataUpdater的实现类是DefaultMetadataUpdater,是NetworkClient类的一个内部类,其maybeUpdate方法如下:

private long maybeUpdate(long now, Node node) {    //获取连接的NodeId    String nodeConnectionId = node.idString();    //判断网络连接是否已经建立好,如果已经建立好,执行下面的代码(第二次进来时网络已经建立好了)    //第一次进来,网络连接显然是没有建立好的    if (canSendRequest(nodeConnectionId, now)) {        Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion();        this.inProgressRequestVersion = requestAndVersion.requestVersion;        //构建一个拉取目标topic元数据的请求        MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;        log.debug("Sending metadata request {} to node {}", metadataRequest, node);        //发送拉取元数据的请求        sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);        return defaultRequestTimeoutMs;    }    if (isAnyNodeConnecting()) {        return reconnectBackoffMs;    }    //第一次进来由于没有建立好网络连接,走的是这个分支,初始化一个连接    if (connectionStates.canConnect(nodeConnectionId, now)) {        log.debug("Initialize connection to node {} for sending metadata request", node);        //初始化一个到给定节点的网络连接,其实只绑定了OP_CONNECT事件        initiateConnect(node, now);        return reconnectBackoffMs;    }    return Long.MAX_VALUE;}

当do...while循环第一次走到这个方法时,由于没有和给定节点建立连接,所以会先初始化一个网络连接;第二次进入到这个方法时,会执行下面的代码:

if (canSendRequest(nodeConnectionId, now)) {    Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion();    this.inProgressRequestVersion = requestAndVersion.requestVersion;    //构建一个拉取目标topic元数据的请求    MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;    log.debug("Sending metadata request {} to node {}", metadataRequest, node);    //添加拉取元数据的请求    sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);    return defaultRequestTimeoutMs;}

先构建一个拉取元数据的MetadataRequest请求,然后通过sendInternalMetadataRequest方法将这个请求转为ClientRequest请求

 void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) {    //创建一个拉取元数据的请求    ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);    //保存要发送的请求    doSend(clientRequest, true, now);}

然后通过doSend方法将这个请求放到inFlightRequests里面,这里面保存的是已发送但是没有返回响应的请求,默认值最多保存5个请求,然后将这个请求放到发送队列等待Selector的poll方法处理。注意这里的Selector并不是JavaNIO中的那个Selector,而是kafka自己定义的。

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {    String destination = clientRequest.destination();    RequestHeader header = clientRequest.makeHeader(request.version());    if (log.isDebugEnabled()) {        int latestClientVersion = clientRequest.apiKey().latestVersion();        if (header.apiVersion() == latestClientVersion) {            log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,                    clientRequest.correlationId(), destination);        } else {            log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",                    header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);        }    }    Send send = request.toSend(destination, header);    InFlightRequest inFlightRequest = new InFlightRequest(            clientRequest,            header,            isInternalRequest,            request,            send,            now);    //把这个拉取元数据的请求放到inFlightRequests里面,    // 这个里面存储的是已发送请求,但是未返回响应的请求,默认最多5个    this.inFlightRequests.add(inFlightRequest);    //把请求放到发送队列等待poll方法处理    selector.send(send);}

步骤二:

//TODO 步骤二:发送请求,进行复杂的网络操作this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));

poll方法的部分代码如下,这里用的就是Java的NIO,其中nioSelector才是Java中的Selecotr对象:

@Overridepublic void poll(long timeout) throws IOException {    ...    /* check ready keys */    long startSelect = time.nanoseconds();    //获取已经准备好io的selectionKey(channel)个数    int numReadyKeys = select(timeout);    long endSelect = time.nanoseconds();    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());    if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {        //获取所有准备好的selectionKey        Set readyKeys = this.nioSelector.selectedKeys();        ...        // Poll from channels where the underlying socket has more data        //遍历selectionKey进行处理        pollSelectionKeys(readyKeys, false, endSelect);        // Clear all selected keys so that they are included in the ready count for the next select        readyKeys.clear();        //处理IO操作        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);        immediatelyConnectedKeys.clear();    } else {        madeReadProgressLastPoll = true; //no work is also "progress"    }    long endIo = time.nanoseconds();    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());    ...}

其中处理IO操作的是pollSelectionKeys方法,截取部分关键代码如下:

@Overridepublic void poll(long timeout) throws IOException {    ...    /* check ready keys */    long startSelect = time.nanoseconds();    //获取已经准备好io的selectionKey(channel)个数    int numReadyKeys = select(timeout);    long endSelect = time.nanoseconds();    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());    if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {        //获取所有准备好的selectionKey        Set readyKeys = this.nioSelector.selectedKeys();        // Poll from channels that have buffered data (but nothing more from the underlying socket)        if (dataInBuffers) {            keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice            Set toPoll = keysWithBufferedRead;            keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed            pollSelectionKeys(toPoll, false, endSelect);        }        //遍历selectionKey进行处理        pollSelectionKeys(readyKeys, false, endSelect);        // Clear all selected keys so that they are included in the ready count for the next select        readyKeys.clear();        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);        immediatelyConnectedKeys.clear();    } else {        madeReadProgressLastPoll = true; //no work is also "progress"    }    ...    //将stageReceives结构中的NetworkReceive对象放到completeReceive集合中    //stageReceives:Map>    //completeReceive:List    addToCompletedReceives();}

关键方法有pollSelectionKeysaddToCompletedReceives,其中pollSelectionKeys用来接收服务端返回的响应,并将响应封装成NetworkReceive保存到数据结构中;addToCompletedReceives用来将NetworkReceive对象放到特定的集合中,最后统一进行处理。

void pollSelectionKeys(Set selectionKeys,                       boolean isImmediatelyConnected,                       long currentTimeNanos) {    for (SelectionKey key : determineHandlingOrder(selectionKeys)) {        //获取对应的Kafkachannel        KafkaChannel channel = channel(key);        long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;        boolean sendFailed = false;        ...        try {            /* complete any connections that have finished their handshake (either normally or immediately) */            //如果key对应的是连接事件,走这个分支            if (isImmediatelyConnected || key.isConnectable()) {                /**                 * TODO 核心代码                 *  最后完成网络连接的代码,如果之前初始化的时候,没有完成网络连接,这里会完成网络连接                 */                if (channel.finishConnect()) {                    //连接成功后,把brokerId放到连接成功的集合中                    this.connected.add(channel.id());                    this.sensors.connectionCreated.record();                    SocketChannel socketChannel = (SocketChannel) key.channel();                    log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",                            socketChannel.socket().getReceiveBufferSize(),                            socketChannel.socket().getSendBufferSize(),                            socketChannel.socket().getSoTimeout(),                            channel.id());                } else {                    continue;                }            }            ...            //如果是接收返回的响应,走这个方法            attemptRead(key, channel);            ...                        //如果是发送数据,走这个分支            if (channel.ready() && key.isWritable() && !channel.maybeBeginClientReauthentication(                () -> channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos)) {                Send send;                try {                    //TODO 往服务端发送消息                    //方法里面消息被发送出去,并移除OP_WRITE事件                    send = channel.write();                } catch (Exception e) {                    sendFailed = true;                    throw e;                }                if (send != null) {                    //TODO 将响应添加到completedSends                    this.completedSends.add(send);                    this.sensors.recordBytesSent(channel.id(), send.size());                }            }        ...}

这里接收返回响应的方法是attemptRead(key, channel),具体的逻辑是:如果KafkaChannel注册的是读事件,就从channel中不断地读取数据,并将NetworkReceive对象添加到stageReceive数据结构中,这是一个Map,key是KafkaChannel,value是一个NetworkReceive队列

private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {    //如果是读请求    if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)        && !explicitlyMutedChannels.contains(channel)) {        //接收服务端的响应(本质也是一个请求)        //NetworkReceive代表的就是服务端返回来的响应        NetworkReceive networkReceive;        while ((networkReceive = channel.read()) != null) {            madeReadProgressLastPoll = true;            //不断地读取数据,将这个响应放到stagedReceive队列中            addToStagedReceives(channel, networkReceive);        }        if (channel.isMute()) {            outOfMemory = true; //channel has muted itself due to memory pressure.        } else {            madeReadProgressLastPoll = true;        }    }}

而对于addToCompletedReceives方法,就是把上面的stageReceive数据结构转为List结构。

步骤三:

//TODO 步骤三:处理响应,响应里面就会有我们需要的元数据handleCompletedReceives(responses, updatedNow);

handleCompletedReceives方法代码如下,如果是关于元数据信息的响应,则执行handleCompletedMetadataResponse方法:

private void handleCompletedReceives(List responses, long now) {    //遍历completedReceives集合中的NetworkReceive    for (NetworkReceive receive : this.selector.completedReceives()) {        //获取brokerid        String source = receive.source();        //获取指定broker最后一个没有返回响应的请求        InFlightRequest req = inFlightRequests.completeNext(source);        //解析服务端返回的响应        Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,            throttleTimeSensor, now);        ...        //TODO 如果是关于元数据信息的响应        if (req.isInternalRequest && body instanceof MetadataResponse)            metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);        else if (req.isInternalRequest && body instanceof ApiVersionsResponse)            handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);        else            responses.add(req.completed(body, now));    }}

截取部分handleCompletedMetadataResponse方法,其主要作用就是更新元数据:

public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {    ...    if (response.brokers().isEmpty()) {        log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());        this.metadata.failedUpdate(now, null);    //如果响应中有broker信息,则更新元数据    } else {        //TODO 更新元数据,注意这里调用的是ProducerMetadata的update方法        // 里面通过notifyALL()方法来唤醒前面等待的线程        this.metadata.update(inProgressRequestVersion, response, now);    }    inProgressRequestVersion = null;}

注意Metadataupdate方法:

public synchronized void update(int requestVersion, MetadataResponse response, long now) {    Objects.requireNonNull(response, "Metadata response cannot be null");    if (isClosed())        throw new IllegalStateException("Update requested after metadata close");    if (requestVersion == this.requestVersion)        this.needUpdate = false;    else        requestUpdate();    this.lastRefreshMs = now;    this.lastSuccessfulRefreshMs = now;    //更新元数据信息时,会将version值+1    this.updateVersion += 1;    String previousClusterId = cache.cluster().clusterResource().clusterId();    this.cache = handleMetadataResponse(response, topic -> retainTopic(topic.topic(), topic.isInternal(), now));    //获取响应中的cluster集群元数据    Cluster cluster = cache.cluster();    maybeSetMetadataError(cluster);    this.lastSeenLeaderEpochs.keySet().removeIf(tp -> !retainTopic(tp.topic(), false, now));    String newClusterId = cache.cluster().clusterResource().clusterId();    if (!Objects.equals(previousClusterId, newClusterId)) {        log.info("Cluster ID: {}", newClusterId);    }    //更新所有监听集群元数据的对象的元数据信息    clusterResourceListeners.onUpdate(cache.cluster().clusterResource());    log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.cache);}

关键步骤:this.updateVersion += 1;更新了元数据的version,此时更新后的version > 未更新前的verison,前面阻塞等待元数据的方法就会返回,从而继续执行KafkaProducer.waitOnMetadata方法后面的逻辑:

//如果线程被唤醒,说明已经可以获取到元数据了cluster = metadata.fetch();...//重新获取分区数partitionsCount = cluster.partitionCountForTopic(topic);

当重新获取的分区数不为null时,退出do...while循环。

至此,客户端就获取了集群的元数据信息,继续执行KafkaProducer.doSend方法后面的逻辑,继续向服务端发送数据。

总结:

  • kafka获取集群元数据是通过Sender线程完成的

  • 在获取集群元数据的过程中,主线程会阻塞,直到拿到元数据或者等待超时

  • NetworkClient是Kafka进行网络操作的组件,拉取集群元数据的过程中进行了封装请求,发送请求和处理响应

  • 元数据的版本号在拉取集群元数据的过程中起到了至关重要的作用

  • Kafka网络通信采用了JavaNIO

tp5 聚合max获取不到string最大值_深入理解Kafka客户端之如何获取集群元数据相关推荐

  1. 如何获取大端中的数据_【软件】ProE中各种获取数据方式的区别

    更多精彩,请点击上方蓝字关注我们! 软件 ProE中各种获取数据方式的区别 输入特征:输入特征是通过数据共享功能从外部文件输入几何的,文件输入以后,会转换成proe软件能够识别的几何,称为一个特征发布 ...

  2. python获取所有a股股票代码_股票量化分析(一)获取A股列表

    2015年的股市是当下的热门话题,同事的朋友弄了一个简单的弹股吐槽单页面单日PV就能达30W+ ,相当于本博客一年的PV量.所以站在技术角度,这里也写几篇关于股票技术面的文章.首先本篇先从获取A股列表 ...

  3. 使用VMware VSphere WebService SDK进行开发 (七)——获取数据中心、集群、主机、虚拟机的目录结构

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  4. Java 数组 定义一个数组,获取数组中的最大值和最小值,奇数个数和偶数个数...

    /*** 定义一个数组,获取数组中的最大值和最小值 奇数个数和偶数个数* */ package com.xuyigang1234.chp01;public class Demo8 {public st ...

  5. php groupby后找最大时间,pandas获取groupby分组里最大值所在的行方法

    下面为大家分享一篇pandas获取groupby分组里最大值所在的行方法,具有很好的参考价值,希望对大家有所帮助.一起过来看看吧 pandas获取groupby分组里最大值所在的行方法 如下面这个Da ...

  6. java中定义一个数组,获取数组中的最大值和最小值

    思路 : 我们定义一个数组,再定义一个max变量 用来存在最大值  : 再定义一个min变量,用来存在最小值: 我们遍历数组,假如当前值大于max,就把当前值赋值给max; 假如当前值小于min,就把 ...

  7. YDOOK:Java: 获取 Long 数据类型的最大值和最小值

    YDOOK:Java: 获取 Long 数据类型的最大值和最小值 © YDOOK JY Lin 文章目录 YDOOK:Java: 获取 Long 数据类型的最大值和最小值 © YDOOK JY Lin ...

  8. numpy使用np.argmax函数获取一维数组中最大值所在的索引(index of largest value in numpy array with np.argmax)

    numpy使用np.argmax函数获取一维数组中最大值所在的索引(index of largest value in numpy array with np.argmax) 目录 numpy使用np ...

  9. java获取栈最大值_实现O(1)获取最大最小值的栈----java

    实现O(1)获取最大最小值的栈和队列----java 一.如何实现包含获取最小值函数的栈 问题:定义栈的数据结构,请在该类型中实现一个能够得到栈的最小元素的getMin函数.在该栈中,调用getMin ...

最新文章

  1. Elasticsearch之Search API
  2. hdfs.DFSClient: Exception in createBlockOutputStre
  3. SQLAlchemy 使用经验
  4. android @id和@+id的区别
  5. WF4.0实战(十一):邮件通知
  6. java.lang.String小测试
  7. canvas笔记-二次贝塞尔曲线与三次贝塞尔曲线的用法
  8. php 查询sybase 实例,php访问sybase16
  9. 计算机网络学习笔记(29. DNS概述)
  10. 你真的需要全栈开发吗?
  11. 停车场管理系统 java_使用java编写一个停车场管理系统
  12. IBM 100年 科技引导未来
  13. ROS中测试机器人里程计信息
  14. 一千行代码铸造Terminal文本编辑器
  15. 车标识别 YOLOv5 YOLOv3 支持奔驰宝马奥迪等车标
  16. 计算机的新技术未来发展趋势论文,计算机最新技术发展趋势毕业论文
  17. CEF浏览器 模拟鼠标点击
  18. 各大公司的大数据质量监控平台
  19. Android开发笔记(一百八十一)使用CameraX拍照
  20. oracle10g没有行列转换函数的替代方法(转)

热门文章

  1. 计算机前端学哪些好学,Web前端能干什么工作,好学吗
  2. pip更新出问题后再输入报No module named ‘pip‘错怎么处理
  3. Algorithm:递归思想及实例分析
  4. 多分类问题的另一种处理策略——softmax回归
  5. MPEG-4视频编码核心思想
  6. 条目十四《使用reserve来避免不必要的重新分配》
  7. 离线批量数据通道Tunnel的最佳实践及常见问题
  8. python round保留小数位_Python-其他-round()保留小数位时遇到的问题
  9. java ref object_深入探讨 java.lang.ref 包
  10. python datetime和字符串如何相互转化?