flink - kafka producer 实现多分区发送数据到consumer,进行消费_flinkkafkaproducer010-程序员宅基地

技术标签: flink  kafka  

    1. 先建立好分区的topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic flinkwrite5

建立分区为3 ,并行度为3 的producer

2.然后在flink中连接这个topic

treamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");

        DataStream<String> stream = env.addSource(new SimpleStringGenerator());
        stream.addSink(new FlinkKafkaProducer010("flink_Producer", new SimpleStringSchema(), properties)).setParallelism(3);  
        env.execute() 

通过从写SourceFunction来实现数据的生产

 public static class StringInput implements SourceFunction<String> {

        long i = 0;
        boolean swith = true;

        String s= "fngkjdgndfngdf";
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            for(int k=0;k<5;k++) {
                ctx.collect("flink:"+s.substring(k,k+2) +" "+ s.substring(k,k+2));
               
            }
        }

        @Override
        public void cancel() {
            swith = false;
        }

    }

}

然后在新建一个类,用来消费这个producer

public class kafkaConsumer {

    public static void main(String args[]) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","localhost:9092");
        properties.setProperty("group.id","consumer-group");
        properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset","latest");
        FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>("flink_Producer", new SimpleStringSchema(), properties);
       // myConsumer.setStartFromEarliest();

        myConsumer.setStartFromLatest();
        Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
        specificStartOffsets.put(new KafkaTopicPartition("flink_Producer", 0), 23L);
        specificStartOffsets.put(new KafkaTopicPartition("flink_Producer", 1), 31L);
        specificStartOffsets.put(new KafkaTopicPartition("flink_Producer", 2), 43L);

        myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
        DataStream<String> stream = env.addSource(myConsumer);

        stream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out)
                    throws Exception {
                for(String word: value.split(" ")){
                    out.collect(word);
                }
            }
        });

        Properties properties1 = new Properties();
        properties1.setProperty("bootstrap.servers","localhost:9092");

        stream.print();
        System.out.println("teset");


        env.execute("kafka sink");
    }

}
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_38472282/article/details/105169191

智能推荐

UTF-8 与 UTF-16编码详解_utf-8和utf-16-程序员宅基地

文章浏览阅读1.8w次,点赞25次,收藏109次。UTF-16是Unicode字符编码五层次模型的第三层:字符编码表(Character Encoding Form,也称为 "storage format")的一种实现方式。即把Unicode字符集的抽象码位映射为16位长的整数(即码元, 长度为2 Byte)的序列,用于数据存储或传递。Unicode字符的码位,需要1个或者2个16位长的码元来表示,因此这是一个变长表示。引用维基百科中对于UTF-16编码的解释我们可以知道,UTF-16最少也会用2 Byte来表示一个字符,因此没有办法兼容ASCII编码。_utf-8和utf-16

多线程和高并发介绍_多线程与高并发-程序员宅基地

文章浏览阅读2.9w次,点赞40次,收藏256次。多线程和高并发介绍文章目录多线程和高并发介绍前言一、什么是多线程?1.多线程介绍2.多线程实现原理?3.白话文解释多线程4.多线程存在的问题二、什么是高并发?1.高并发介绍2.如何提升系统的并发能力三、多线程和高并发总结前言本文主要是针对多线程和高并发的概念做了简单的描述,介绍了什么是多线程,什么是高并发,并且对多线程和高并发的关系做了比较描述。一、什么是多线程?1.多线程介绍 什么是多线程,首先看下百度百科对多线程的定义;多线程(multithreading)_多线程与高并发

uni-app中,文字超出隐藏并显示省略号(实现展开、收起全文)_uniapp 超出文本显示省略号-程序员宅基地

文章浏览阅读3.1w次,点赞14次,收藏58次。uni-app中,固定宽高,文字超出部分,隐藏并显示省略号。.topic_cont_text{ padding: 30upx; colof: #999; background: #E1FFFF; max-height: 130upx; overflow: hidden; word-break: break-all; /* break-all(允许在单词内换行。) */..._uniapp 超出文本显示省略号

Spring Bean循环依赖问题及解决_循环依赖调用-程序员宅基地

文章浏览阅读1.2k次,点赞2次,收藏2次。类与类之间的依赖关系形成了闭环,就会导致循环依赖问题的产生。举例来说,假设存在两个服务类A和服务类B,如果A通过依赖注入的方式引用了B,且B通过依赖注入的方式引用了A,那么A和B之间就存在循环依赖。推广来说,如果涉及多个类,也存在这种依赖关系,那么也是循环依赖问题。循环依赖问题比较严重,有时会影响服务启动,有时会导致死循环调用(如果线上环境出现循环调用,会导致程序进入死循环,然后服务崩溃,进而导致用户请求无法响应,造成生产事故),应引起足够的重视。_循环依赖调用

php 查询 mysql 字段是否存在,尝试使用PHP检查MySQL数据库中是否已经存在用户名...-程序员宅基地

文章浏览阅读53次。I've looked at the many other posts that were similar to my issue and implemented their solutions (as far as I can tell) as exactly as I could. However, every time I execute this script, the code in t..._php 查询数据库表里某个字段里面是否有某个用户名

RuntimeError: CUDA error: an illegal memory access was encountered 错误解决方案-程序员宅基地

文章浏览阅读3.1w次。RuntimeError: CUDA error: an illegal memory access was encountered首先,大家先检查自己的网络的参数是否有问题,如果参数有问题会导致此问题。其次,博主遇到一个情况。在单GPU下开启时,eval阶段会报这种错误。torch.nn.DataParallel(net,device_ids=[0])在net eval之前加..._runtimeerror: cuda error: an illegal memory access was encountered

随便推点

基于 Spring Boot + Vue 实现的可视化拖拽编辑的大屏项目-程序员宅基地

文章浏览阅读1k次。大家好,今天给小伙伴们分享一个基于 SpringBoot + Vue 实现的可视化拖拽编辑的大屏项目;# 简介这个是一个开源的一个BI平台,酷炫大屏展示,能随时随地掌控业务动态,让每个决策都有数据支撑。多数据源支持,内置mysql、elasticsearch、kudu驱动,支持自定义数据集省去数据接口开发,支持17种大屏组件,不会开发,照着设计稿也可以制作大屏。三步轻松完成大屏设计:配置数据源--..._vue实现拖拽可视化

使用百度sdk定位相关参数设定_百度android sdk设置精度优先-程序员宅基地

文章浏览阅读3.3k次。使用百度sdk定位相关参数设定_百度android sdk设置精度优先

扩展坞可以把手机投到显示器吗_手机 篇二:给电脑配的 Type-C拓展坞 没想到手机也能用...-程序员宅基地

文章浏览阅读3.5k次。原标题:手机 篇二:给电脑配的 Type-C拓展坞 没想到手机也能用最近,凑单买配件,入手了一个名为EDAX牌子的 Type-C拓展坞,看介绍主要用于电脑,如新款MacBook和各类Type-C接口的本本。前几天给手机刷机,需要拷贝几个GB的固件到内存卡,但内存卡是龟速,突然想到,可以用OTG外接U盘,但手头没有OTG线。我想了会,这个 Type-C拓展坞 有USB接口,说不定能用。外观展示铝合金..._手机能通过扩展坞连接电脑显示器上吗

CRC16校验、CRC32校验_crc16和crc32区别-程序员宅基地

文章浏览阅读3.6k次,点赞2次,收藏9次。//CRC16校验unsigned short CRC16( unsigned char* puchMsg, unsigned short usDataLen);//CRC32实现函数unsigned int CRC32( unsigned char *buf, unsigned int len); //CRC16校验unsigned short CRC16( unsign..._crc16和crc32区别

编译OpenSSL 动态库/静态库以及运行时库的选择_openssl ms/do_ms 静态库-程序员宅基地

文章浏览阅读1.5k次。Windows下编译OpenSSL的方法 1、安装ActivePerl 初始化的时候,需要使用perl 2、使用VS下的Visual Studio 20xx Command Prompt进入控制台模式 3、解压缩openssl的包,通过cd命令切换到openssl的目录 4、执行:perl configure VC-WIN32 5、执行:ms/do_ms 6、选择不同的编译结果_openssl ms/do_ms 静态库

ERROR 1820 (HY000): You must SET PASSWORD before-程序员宅基地

文章浏览阅读44次。2019独角兽企业重金招聘Python工程师标准>>> ..._python 1820 you must

推荐文章

热门文章

相关标签