序言 以官网的例子为起点,选用Kafka为source和sink ,了解下批流统一的使用cuiyaonan2000@163.com
参考资料:
- Kafka | Apache Flink----表连接器
- JSON | Apache Flink----表格式器
DEMO 如下是官网创建Kafka的SQL,后面都是针对该SQL开始的.据说会了Kafka,我们就可以掌握Hive,Haddop所以一个药引子的作用非常大cuiyaonan2000@163.com
package cui.yao.nan.flink;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableResult;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import java.nio.file.FileSystem;/** * @Author: cuiyaonan2000@163.com * @Description: todo * @Date: Created at 2022-3-2416:13 */public class Test2 {public static void main(String[] args) throws Exception {//创建流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//创建表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);TableResult tableResult = tableEnv.executeSql("CREATE TABLE jjjk (" +"myoffsetBIGINT METADATA FROM 'offset' VIRTUAL," +"mypartitionBIGINT METADATA FROM 'partition' VIRTUAL," +"id BIGINT," +"name STRING," +"age BIGINT " +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'topic-name-cui'," +" 'properties.bootstrap.servers' = '172.17.15.2:9092'," +" 'properties.group.id' = 'testGroup'," +" 'scan.startup.mode' = 'earliest-offset'," +" 'format' = 'json'," +" 'json.fail-on-missing-field' = 'false'," +" 'json.ignore-parse-errors' = 'true'" +")");Table table = tableEnv.sqlQuery("select id,name,age,mypartition,myoffset From jjjk");// 将该视图结果在转成一个流DataStream resultStream = tableEnv.toDataStream(table);// add a printing sink and execute in DataStream APIresultStream.print();env.execute();}}
【Flink的批流统一:Ⅴ】
- 春季老年人吃什么养肝?土豆、米饭换着吃
- 三八妇女节节日祝福分享 三八妇女节节日语录
- 老人谨慎!选好你的“第三只脚”
- 校方进行了深刻的反思 青岛一大学生坠亡校方整改校规
- 脸皮厚的人长寿!有这特征的老人最长寿
- 长寿秘诀:记住这10大妙招 100%增寿
- 春季老年人心血管病高发 3条保命要诀
- 眼睛花不花要看四十八 老年人怎样延缓老花眼
- 香槟然能防治老年痴呆症? 一天三杯它人到90不痴呆
- 老人手抖的原因 为什么老人手会抖
