请在下面找到使用侧输出和插槽组进行本地扩展的示例 .

package org.example

/*

* Licensed to the Apache Software Foundation (ASF) under one

* or more contributor license agreements. See the NOTICE file

* distributed with this work for additional information

* regarding copyright ownership. The ASF licenses this file

* to you under the Apache License, Version 2.0 (the

* "License"); you may not use this file except in compliance

* with the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*/

import org.apache.flink.streaming.api.functions.ProcessFunction

import org.apache.flink.streaming.api.scala._

import org.apache.flink.util.Collector

/**

* This example shows an implementation of WordCount with data from a text socket.

* To run the example make sure that the service providing the text data is already up and running.

*

* To start an example socket text stream on your local machine run netcat from a command line,

* where the parameter specifies the port number:

*

* {{{

* nc -lk 9999

* }}}

*

* Usage:

* {{{

* SocketTextStreamWordCount

* }}}

*

* This example shows how to:

*

* - use StreamExecutionEnvironment.socketTextStream

* - write a simple Flink Streaming program in scala.

* - write and use user-defined functions.

*/

object SocketTextStreamWordCount {

def main(args: Array[String]) {

if (args.length != 2) {

System.err.println("USAGE:\nSocketTextStreamWordCount ")

return

}

val hostName = args(0)

val port = args(1).toInt

val outputTag1 = OutputTag[String]("side-1")

val outputTag2 = OutputTag[String]("side-2")

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.getConfig.enableObjectReuse()

//Create streams for names and ages by mapping the inputs to the corresponding objects

val text = env.socketTextStream(hostName, port).slotSharingGroup("processElement")

val counts = text.flatMap {

_.toLowerCase.split("\\W+") filter {

_.nonEmpty

}

}

.process(new ProcessFunction[String, String] {

override def processElement(

value: String,

ctx: ProcessFunction[String, String]#Context,

out: Collector[String]): Unit = {

if (value.head <= 'm') ctx.output(outputTag1, String.valueOf(value))

else ctx.output(outputTag2, String.valueOf(value))

}

})

val sideOutputStream1: DataStream[String] = counts.getSideOutput(outputTag1)

val sideOutputStream2: DataStream[String] = counts.getSideOutput(outputTag2)

val output1 = sideOutputStream1.map {

(_, 1)

}.slotSharingGroup("map1")

.keyBy(0)

.sum(1)

val output2 = sideOutputStream2.map {

(_, 1)

}.slotSharingGroup("map2")

.keyBy(0)

.sum(1)

output1.print()

output2.print()

env.execute("Scala SocketTextStreamWordCount Example")

}

}

java消息顺序执行_Apache Flink:如何并行执行但保持消息顺序?相关推荐

  1. Java中控制多线程顺序执行

    Java中控制多线程顺序执行 一.概述 二.普通示例 三.控制示例 3.1.设置线程优先级 3.2.使用线程类的join() 3.2.1.在主线程join() 3.2.2.在子线程join() 3.3 ...

  2. java 并行 执行进度_关于java:Java8流的顺序执行和并行执行产生不同的结果?

    在Java8中运行以下流示例: System.out.println(Stream .of("a","b","c","d" ...

  3. 如何保证消息队列里的数据顺序执行?

    使用MQ的时候,经常会有按顺序消费的需求,比如大数据团队为了做数据分析,会把数据库里数据同步到其他系统做一些数据统计分析.同步MySQL的时候,为了保证数据同步的实时性,会在中间加一个MQ,多个线程来 ...

  4. 多线程顺序消费MySQL数据_关于MQ的几件小事(五)如何保证消息按顺序执行

    1.为什么要保证顺序 消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常.举例: 比如通过mysql binlog进行两个数据库的数据 ...

  5. java代码块执行顺序_Java笔记 | Java代码块执行顺序测试

    最近笔试常常遇到考察Java代码块执行顺序的题目,网上查看博客错漏百出,特地自己测试了一下. 如有错漏,希望路过的大佬指出来,以便我进行更改. 先上代码吧! public class ClassA { ...

  6. Java的finally执行顺序_Java return和finally执行顺序

    在Java语法中,return表示当前执行的结束,finally则是在当前代码块一定会执行的代码块.如果return在finally在之前执行,finally也会执行吗?答案是肯定的! 但是,fina ...

  7. ES6 Promise 并行执行和顺序执行

    1.Promise.all 并行执行promise getA和getB并行执行,然后输出结果.如果有一个错误,就抛出错误 /*** 每一个promise都必须返回resolve结果才正确* 每一个pr ...

  8. java 判断顺序_通过指令码来判断Java代码的执行顺序(++问题与return和finally的问题)...

    问题 在<深入理解Java虚拟机>一书中遇到了如下代码: public int method() { int i; try { i = 1; return i; } catch (Exce ...

  9. java 如何判定消息已在队列_【05期】消息队列中,如何保证消息的顺序性?

    本文选自:advanced-java 作者:yanglbme 问:如何保证消息的顺序性? 面试官心理分析 其实这个也是用 MQ 的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保 ...

最新文章

  1. shape file与coverage叠加的问题
  2. 【Groovy】编译时元编程 ( 利用注解进行 AST 语法树转换 | 定义注解并使用 GroovyASTTransformationClass 注明 AST 转换接口 | AST 转换接口实现 )
  3. JVM - 深入剖析字符串常量池
  4. Taro+react开发(48)taro中switchTab
  5. 使用python实现对于chineseocr的API调用
  6. CornerNet: Detecting Objects as Paired Keypoints
  7. java 数组 截取_Java成长孵化园---认识java(day09)
  8. 程序员笑话集锦之丈夫与妻子篇
  9. 深信服AC1100上网行为管理
  10. SAI绘制宇宙的翅膀
  11. pe卸载linux系统软件,Windows和Linux双系统下完美卸载linux
  12. Gym - 101572K Kayaking Trip 二分
  13. TI GEL文件作用
  14. 46招健脑秘笈,让你变得更聪明
  15. po模型+unittest测试
  16. 《C语言及程序设计》实践参考——学生成绩统计
  17. 西部之旅之------相机的选择
  18. TypeScript基础+进阶
  19. 2018年全国多校算法寒假训练营练习比赛(第五场)解题报告
  20. TMS320F280049C 学习笔记19 可配置逻辑块 (CLB) 软件配置

热门文章

  1. Java08-java语法基础(七)构造方法
  2. 6-12mysql库的操作
  3. ctsc2009 移民站选址
  4. 哥尼斯堡的“七桥问题” (欧拉回路,并查集)
  5. Vue项目启动webpack报错Module build failed: Error: No PostCSS Config found in......
  6. [pytorch、学习] - 5.8 网络中的网络(NiN)
  7. javascript --- [小练习]变量提升、优先级综合
  8. [译] 前端组件设计原则
  9. Language-Directed Hardware Design for Network Performance Monitoring——Marple
  10. 当安全遇到大数据 “永恒之蓝”也将无所遁形!