编辑
2023-02-04
flink
00
请注意,本文编写于 440 天前,最后修改于 416 天前,其中某些信息可能已经过时。

目录

一、重要概念说明
二、需要的环境
三、环境搭建
1. 假设已经搭建好java基本开发环境(jdk配置,maven配置,idea工具)
2. Oracle数据开启归档日志,并且给指定用户分配相应权限
2.1 以DBA的角色进入数据库
2.2 开启oracle数据库归档日志
2.3 配置用户访问日志的权限
2.4 开启oracle数据库归档日志
3. mysql数据开启日志
7、编码
七、总结与思考

一、重要概念说明

**Flink基本概念及用途自行百度或者阅读官网文档,本系列旨在快速学习搭建flink开发环境,只提供个人理解说明**🦍 官网文档地址:https://flink.apache.org/

Flink用途

官网概述:Apache Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。Flink 不仅可以运行在包括 YARN、 Mesos、Kubernetes 在内的多种资源管理框架上,还支持在裸机集群上独立部署。在启用高可用选项的情况下,它不存在单点失效问题。事实证明,Flink 已经可以扩展到数千核心,其状态可以达到 TB 级别,且仍能保持高吞吐、低延迟的特性。世界各地有很多要求严苛的流处理应用都运行在 Flink 之上。

注意

本文的场景简单设定为:Oracle数据存储业务数据,Mysql为实时报表展示系统,这里通过FLink cdc实现oracle数据以毫秒、秒级别的延迟进行数据ETL。

说白了就是可集成在各种架构里面的一个大数据处理框架,并且支持批处理或者流处理,抑或批处理加流处理。而且Flink最大的优势也是在流处理上(高吞吐、低延迟、高性能)


二、需要的环境

  1. JDK1.8及以上
  2. maven3.6
  3. IDEA或者其他开发工具
  4. Oracle数据开启归档日志,并且给指定用户分配相应权限
  5. mysql开启binlog
  6. Flink想要的jar包

三、环境搭建

1. 假设已经搭建好java基本开发环境(jdk配置,maven配置,idea工具)

2. Oracle数据开启归档日志,并且给指定用户分配相应权限

2.1 以DBA的角色进入数据库

js
# 登录 sqlplus /nolog; conn /as sysdba;

2.2 开启oracle数据库归档日志

db_recovery_file_dest_sizedb_recovery_file_dest根据情况配置

sql
# 开启归档日志 alter system set db_recovery_file_dest_size = 10G; alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile; shutdown immediate; startup mount; alter database archivelog; alter database open;

检查归档日志是否开启

js
archive log list;

2.3 配置用户访问日志的权限

js
# 创建表空间 CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; # 授权 GRANT CREATE SESSION TO flinkuser; GRANT SET CONTAINER TO flinkuser; GRANT SELECT ON V_$DATABASE to flinkuser; GRANT FLASHBACK ANY TABLE TO flinkuser; GRANT SELECT ANY TABLE TO flinkuser; GRANT SELECT_CATALOG_ROLE TO flinkuser; GRANT EXECUTE_CATALOG_ROLE TO flinkuser; GRANT SELECT ANY TRANSACTION TO flinkuser; GRANT LOGMINING TO flinkuser; GRANT CREATE TABLE TO flinkuser; GRANT LOCK ANY TABLE TO flinkuser; GRANT ALTER ANY TABLE TO flinkuser; GRANT CREATE SEQUENCE TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser; GRANT SELECT ON V_$LOG TO flinkuser; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser; GRANT SELECT ON V_$LOGFILE TO flinkuser; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

2.4 开启oracle数据库归档日志

表配置(自己创建一个简单表)

js
ALTER TABLE flink.stu ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

数据库的配置

js
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

3. mysql数据开启日志

创建用户

js
CREATE USER 'user'@'flink' IDENTIFIED BY 'fink';

授权

js
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password'; FLUSH PRIVILEGES;

7、编码

打开idea创建一个maven项目,空的maven项目或者通过flink模板创建 83ybd4 maven依赖

xml
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.16.0</flink.version> <target.java.version>1.8</target.java.version> <scala.binary.version>2.12</scala.binary.version> <maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <log4j.version>2.17.1</log4j.version> </properties> <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <!--flink--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Table ecosystem --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Table connectors and formats --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>1.17-SNAPSHOT</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-oracle-cdc</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency> </dependencies>

DataStreamJob.java代码案例

js
package org.example; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.oracle.OracleSource; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Properties; public class DataStreamJob { public static void main(String[] args) throws Exception { MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("192.168.1.108") .port(3306) .databaseList("school") // set captured database .tableList("school.statistics_guangzhi_student") // set captured table .username("flink") .password("flink") .serverTimeZone("UTC") .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); Properties properties = new Properties(); //properties.put("database.url", "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(FAILOVER=ON)(LOAD_BALANCE=OFF)(ADDRESS=(PROTOCOL=TCP)(HOST=192.168.15.105)(PORT=1521))(ADDRESS=(PROTOCOL=TCP)(HOST=192.168.15.105)(PORT=1521)))(CONNECT_DATA=warehosetest)(SERVER=DEDICATED)))"); properties.put("database.url","jdbc:oracle:thin:@192.168.15.105:1521:warehosetest"); SourceFunction<String> sourceFunction = OracleSource.<String>builder() .hostname("192.168.15.105") .port(1521) .database("warehosetest") // monitor XE database .schemaList("flink") // monitor inventory schema .tableList("flink.stu") // monitor products table .username("flink") .password("flink") .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .debeziumProperties(properties) .build(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .addSource(sourceFunction) //.addSink() .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering env.execute(); } }

警告

ORA-12514, TNS

does not currently know of service requested in connect descriptor 此错误可通过配置debeziumProperties增加database.url配置解决oracle通过sid连不上的问题

sink部分还待完善

代码地址:https://github.com/jingjianqian/flink

七、总结与思考

评论区讨论。。。。。。。。。。。。。

如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:Joker

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!