實時數倉及olap可視化構建(基于mysql,將maxwell改成seatunnel可以快速達成異構數據源實時同步)
1. OLAP可視化實現(需要提前整合版本)
| Linux121 | Linux122 | Linux123 | ||
|---|---|---|---|---|
| jupyter | ? | |||
| spark | ? | ? | ? | |
| python3+SuperSet3.0 | ? | |||
| hive | ? | |||
| ClinckHouse | ? | |||
| Kafka | ? | ? | ? | |
| Phoenix | ? | |||
| DataX | ? | |||
| maxwell | ? | |||
| Hadoop | ? | ? | ? | |
| MySQL | ? | |||
| ZK | ? | ? | ? | |
| HBASE | ? | ? | ? |
1.1 安裝Vmware,安裝虛擬機集群
1.1.1 安裝 (VMware-workstation-full-15.5.5-16285975)
許可證:
UY758-0RXEQ-M81WP-8ZM7Z-Y3HDA
1.1.2 安裝 centos7






















123456


1.1.3 配置靜態IP



vi /etc/sysconfig/network-scripts/ifcfg-ens33

:wq
systemctl restart network
ip addr

ping www.baidu.com
快照
安裝jdk
mkdir -p /opt/lagou/software --軟件安裝包存放目錄
mkdir -p /opt/lagou/servers --軟件安裝目錄
rpm -qa | grep java
清理上面顯示的包名
sudo yum remove java-1.8.0-openjdk
上傳文件jdk-8u421-linux-x64.tar.gz
chmod 755 jdk-8u421-linux-x64.tar.gz
解壓文件到/opt/lagou/servers目錄下
tar -zxvf jdk-8u421-linux-x64.tar.gz -C /opt/lagou/servers
cd /opt/lagou/servers
ll
配置環境
vi /etc/profile
export JAVA_HOME=/opt/lagou/servers/jdk1.8.0_421
export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar
export PATH=$PATH:${JAVA_HOME}/bin
source /etc/profile
java -version
1.1.4 安裝Xmanager
連接192.168.49.121:22
密碼:123456
1.1.5 克隆2臺機器,并配置





vi /etc/sysconfig/network-scripts/ifcfg-ens33

systemctl restart network
ip addr
hostnamectl
hostnamectl set-hostname linux121
關閉防火墻
systemctl status firewalld
systemctl stop firewalld
systemctl disable firewalld
關閉selinux
vi /etc/selinux/config

三臺機器免密登錄
vi /etc/hosts

192.168.49.121 linux121
192.168.49.122 linux122
192.168.49.123 linux123

第一步: ssh-keygen -t rsa 在centos7-1和centos7-2和centos7-3上面都要執行,產生公鑰
和私鑰
ssh-keygen -t rsa
第二步:在centos7-1 ,centos7-2和centos7-3上執行:
ssh-copy-id linux121 將公鑰拷貝到centos7-1上面去
ssh-copy-id linux122 將公鑰拷貝到centos7-2上面去
ssh-copy-id linux123 將公鑰拷貝到centos7-3上面去
ssh-copy-id linux121
ssh-copy-id linux122
ssh-copy-id linux123
第三步:
centos7-1執行:
scp /root/.ssh/authorized_keys linux121:$PWD
scp /root/.ssh/authorized_keys linux122:$PWD
scp /root/.ssh/authorized_keys linux123:$PWD
三臺機器時鐘同步
sudo cp -a /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.bak
sudo curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
sudo yum clean all
sudo yum makecache
sudo yum install ntpdate
ntpdate us.pool.ntp.org
crontab -e
*/1 * * * * /usr/sbin/ntpdate us.pool.ntp.org;
快照
1.2 安裝ZK,Hadoop,Hbase集群,安裝mysql
1.2.1 安裝hadoop集群(推薦2.7.3版本)
在/opt目錄下創建文件夾
mkdir -p /opt/lagou/software --軟件安裝包存放目錄
mkdir -p /opt/lagou/servers --軟件安裝目錄
上傳hadoop安裝文件到/opt/lagou/software
https://archive.apache.org/dist/hadoop/common/hadoop-2.7.3/
hadoop-2.7.3.tar.gz

linux121節點
tar -zxvf hadoop-2.7.3.tar.gz -C /opt/lagou/servers
ll /opt/lagou/servers/hadoop-2.7.3
yum install -y vim
添加環境變量
vim /etc/profile
##HADOOP_HOME
export HADOOP_HOME=/opt/lagou/servers/hadoop-2.7.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
source /etc/profile
hadoop version
HDFS集群配置
cd /opt/lagou/servers/hadoop-2.7.3/etc/hadoop
vim hadoop-env.sh
export JAVA_HOME=/opt/lagou/servers/jdk1.8.0_421
vim core-site.xml
<!-- 指定HDFS中NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://linux121:9000</value>
</property>
<!-- 指定Hadoop運行時產生文件的存儲目錄 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/lagou/servers/hadoop-2.7.3/data/tmp</value>
</property>
vim slaves
linux121
linux122
linux123
vim mapred-env.sh
export JAVA_HOME=/opt/lagou/servers/jdk1.8.0_421
mv mapred-site.xml.template mapred-site.xml
vim mapred-site.xml
<!-- 指定MR運行在Yarn上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
vi mapred-site.xml
在該文件里面增加如下配置。
<!-- 歷史服務器端地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>linux121:10020</value>
</property>
<!-- 歷史服務器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>linux121:19888</value>
</property>
vim yarn-env.sh
export JAVA_HOME=/opt/lagou/servers/jdk1.8.0_421
vim yarn-site.xml
<!-- 指定YARN的ResourceManager的地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>linux123</value>
</property>
<!-- Reducer獲取數據的方式 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
vi yarn-site.xml
在該文件里面增加如下配置。
<!-- 日志聚集功能使能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 日志保留時間設置7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
<property>
<name>yarn.log.server.url</name>
<value>http://linux121:19888/jobhistory/logs</value>
</property>
?
chown -R root:root /opt/lagou/servers/hadoop-2.7.3
分發配置
三臺都要
sudo yum install -y rsync
touch rsync-script
vim rsync-script
#!/bin/bash
#1 獲取命令輸入參數的個數,如果個數為0,直接退出命令
paramnum=$#
if((paramnum==0)); then
echo no params;
exit;
fi
#2 根據傳入參數獲取文件名稱
p1=$1
file_name=`basename $p1`
echo fname=$file_name
#3 獲取輸入參數的絕對路徑
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir
#4 獲取用戶名稱
user=`whoami`
#5 循環執行rsync
for((host=121; host<124; host++)); do
echo ------------------- linux$host --------------
rsync -rvl $pdir/$file_name $user@linux$host:$pdir
done
chmod 777 rsync-script
./rsync-script /home/root/bin
./rsync-script /opt/lagou/servers/hadoop-2.7.3
./rsync-script /opt/lagou/servers/jdk1.8.0_421
./rsync-script /etc/profile
在namenode,linux121上格式化節點
hadoop namenode -format
ssh localhost
集群群起
cd $HADOOP_HOME/sbin
start-dfs.sh

datanode可能起不來
sudo rm -rf /opt/lagou/servers/hadoop-2.7.3/data/tmp/*
hadoop namenode -format
sbin/start-dfs.sh
注意:NameNode和ResourceManger不是在同一臺機器,不能在NameNode上啟動 YARN,應該
在ResouceManager所在的機器上啟動YARN
sbin/start-yarn.sh
linux121:
cd /opt/lagou/servers/hadoop-2.7.3
sbin/mr-jobhistory-daemon.sh start historyserver
地址:
hdfs:
http://linux121:50070/dfshealth.html#tab-overview
日志:
http://linux121:19888/jobhistory
cd /opt/lagou/servers/hadoop-2.7.3
sbin/mr-jobhistory-daemon.sh stop historyserver
stop-yarn.sh
stop-dfs.sh
測試
hdfs dfs -mkdir /wcinput
cd /root/
touch wc.txt
vi wc.txt
hadoop mapreduce yarn
hdfs hadoop mapreduce
mapreduce yarn lagou
lagou
lagou
保存退出
: wq!
hdfs dfs -put wc.txt /wcinput
hadoop jar share/hadoop/mapreduce/hadoop mapreduce-examples-2.7.3.jar wordcount /wcinput /wcoutput
1.2.2 安裝zk集群
上傳并解壓zookeeper-3.4.14.tar.gz
tar -zxvf zookeeper-3.4.14.tar.gz -C ../servers/
修改配置?文件創建data與log?目錄
#創建zk存儲數據?目錄
mkdir -p /opt/lagou/servers/zookeeper-3.4.14/data
#創建zk?日志?文件?目錄
mkdir -p /opt/lagou/servers/zookeeper-3.4.14/data/logs
#修改zk配置?文件
cd /opt/lagou/servers/zookeeper-3.4.14/conf
#?文件改名
mv zoo_sample.cfg zoo.cfg
mkdir -p /opt/lagou/servers/zookeeper-3.4.14/data
mkdir -p /opt/lagou/servers/zookeeper-3.4.14/data/logs
cd /opt/lagou/servers/zookeeper-3.4.14/conf
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
#更更新datadir
dataDir=/opt/lagou/servers/zookeeper-3.4.14/data
#增加logdir
dataLogDir=/opt/lagou/servers/zookeeper-3.4.14/data/logs
#增加集群配置
##server.服務器?ID=服務器?IP地址:服務器?之間通信端?口:服務器?之間投票選舉端?口
server.1=linux121:2888:3888
server.2=linux122:2888:3888
server.3=linux123:2888:3888
#打開注釋
#ZK提供了了?自動清理理事務?日志和快照?文件的功能,這個參數指定了了清理理頻率,單位是?小時
autopurge.purgeInterval=1
cd /opt/lagou/servers/zookeeper-3.4.14/data
echo 1 > myid
安裝包分發并修改myid的值
cd /opt/lagou/servers/hadoop-2.7.3/etc/hadoop
./rsync-script /opt/lagou/servers/zookeeper-3.4.14
修改myid值 linux122
echo 2 >/opt/lagou/servers/zookeeper-3.4.14/data/myid
修改myid值 linux123
echo 3 >/opt/lagou/servers/zookeeper-3.4.14/data/myid
依次啟動三個zk實例例
啟動命令(三個節點都要執?行行)
/opt/lagou/servers/zookeeper-3.4.14/bin/zkServer.sh start
查看zk啟動情況
/opt/lagou/servers/zookeeper-3.4.14/bin/zkServer.sh status
集群啟動停?止腳本
vim zk.sh
#!/bin/sh
echo "start zookeeper server..."
if(($#==0));then
echo "no params";
exit;
fi
hosts="linux121 linux122 linux123"
for host in $hosts
do
ssh $host "source /etc/profile; /opt/lagou/servers/zookeeper-3.4.14/bin/zkServer.sh $1"
done
chmod 777 zk.sh
cd /root
./zk.sh start
./zk.sh stop
./zk.sh status
1.2.3 安裝Hbase集群(先啟動Hadoop和zk才能啟動Hbase)
解壓安裝包到指定的規劃目錄 hbase-2.4.15-bin.tar.gz
tar -zxvf hbase-2.4.15-bin.tar.gz -C /opt/lagou/servers
修改配置文件
把hadoop中的配置core-site.xml 、hdfs-site.xml拷貝到hbase安裝目錄下的conf文件夾中
ln -s /opt/lagou/servers/hadoop-2.7.3/etc/hadoop/core-site.xml /opt/lagou/servers/hbase-2.4.15/conf/core-site.xml
ln -s /opt/lagou/servers/hadoop-2.7.3/etc/hadoop/hdfs-site.xml /opt/lagou/servers/hbase-2.4.15/conf/hdfs-site.xml
修改conf目錄下配置文件
cd /opt/lagou/servers/hbase-2.4.15/conf
vim hbase-env.sh
#添加java環境變量
export JAVA_HOME=/opt/lagou/servers/jdk1.8.0_421
#指定使用外部的zk集群
export HBASE_MANAGES_ZK=FALSE
vim hbase-site.xml
<configuration>
<!-- 指定hbase在HDFS上存儲的路徑 -->
<property>
<name>hbase.rootdir</name>
<value>hdfs://linux121:9000/hbase</value>
</property>
<!-- 指定hbase是分布式的 -->
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 指定zk的地址,多個用“,”分割 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>linux121:2181,linux122:2181,linux123:2181</value>
</property>
</configuration>
vim regionservers
linux121
linux122
linux123
vim backup-masters
linux122
vim /etc/profile
export HBASE_HOME=/opt/lagou/servers/hbase-2.4.15
export PATH=$PATH:$HBASE_HOME/bin
分發hbase目錄和環境變量到其他節點
cd /opt/lagou/servers/hadoop-2.7.3/etc/hadoop
./rsync-script /opt/lagou/servers/hbase-2.4.15
./rsync-script /etc/profile
讓所有節點的hbase環境變量生效
在所有節點執行 source /etc/profile
cd /opt/lagou/servers/hbase-2.4.15/bin
HBase集群的啟動和停止
前提條件:先啟動hadoop和zk集群
啟動HBase:start-hbase.sh
停止HBase:stop-hbase.sh
HBase集群的web管理界面
啟動好HBase集群之后,可以訪問地址:HMaster的主機名:16010
hbase shell
linux121:16010
1.2.4 安裝mysql
卸載系統自帶的mysql
rpm -qa | grep mysql
rpm -e --nodeps mysql-libs-5.1.73-8.el6_8.x86_64
安裝mysql-community-release-el6-5.noarch.rpm
rpm -ivh mysql-community-release-el6-5.noarch.rpm
安裝mysql 服務器
yum -y install mysql-community-server
啟動服務
service mysqld start
如果出現:serivce: command not found
安裝service
yum install initscripts
配置數據庫
設置密碼
/usr/bin/mysqladmin -u root password '123'
# 進入mysql
mysql -uroot -p123
# 清空 mysql 配置文件內容
>/etc/my.cnf
修改
vi /etc/my.cnf
[client]
default-character-set=utf8
[mysql]
default-character-set=utf8
[mysqld]
character-set-server=utf8
重啟查看,授權遠程連接
service mysqld restart
mysql -uroot -p123
show variables like 'character_set_%';
# 給root授權:既可以本地訪問, 也可以遠程訪問
grant all privileges on *.* to 'root'@'%' identified by '123' with grant
option;
# 刷新權限(可選)
flush privileges;
開啟Mysql 的binlog日志
vim /etc/my.cnf
[mysqld]
log-bin=/var/lib/mysql/mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 slaveId 重復
systemctl restart mysqld
mysql -root -p123
show variables like '%log_bin%';
查看是否生產binlog
cd /var/lib/mysql/
快照
1.3 安裝Phoenix,來創建hbase表,安裝datax來導入數據到hbase
1.3.1 數據初始化
運行資料中的talents.sql文件
1.3.2 Phoenix安裝(按對應hbase版本下載)
https://www.apache.org/dyn/closer.lua/phoenix/phoenix-4.16.1/phoenix-hbase-1.3-4.16.1-bin.tar.gz
下載解壓phoenix-hbase-1.3-4.16.1-bin.tar.gz
tar -xvzf phoenix-hbase-1.3-4.16.1-bin.tar.gz -C ../servers/
拷貝Phoenix整合HBase所需JAR包
cd /opt/lagou/servers/phoenix-hbase-1.3-4.16.1-bin
cp phoenix-server-hbase-1.3-4.16.1.jar /opt/lagou/servers/hbase-2.4.15/lib
scp phoenix-server-hbase-1.3-4.16.1.jar linux122:/opt/lagou/servers/hbase-2.4.15/lib
scp phoenix-server-hbase-1.3-4.16.1.jar linux123:/opt/lagou/servers/hbase-2.4.15/lib
cd /opt/lagou/servers/phoenix-hbase-1.3-4.16.1-bin/bin
將hbase的配置文件hbase-site.xml、 hadoop/etc/hadoop下的core-site.xml 、hdfs-site.xml放到
phoenix/bin/下,替換phoenix原來的配置文件
# 備份原先的 hbase-site.xml文件
mv hbase-site.xml hbase-site.xml.bak
ln -s $HBASE_HOME/conf/hbase-site.xml .
ln -s $HADOOP_HOME/etc/hadoop/core-site.xml .
ln -s $HADOOP_HOME/etc/hadoop/hdfs-site.xml .
開啟二級索引
登錄到RegionSever節點,修改hbase-site.xml配置文件,加入如下配置
vi /opt/lagou/servers/hbase-2.4.15/conf/hbase-site.xml
修改
<property>
<name>hbase.zookeeper.quorum</name>
<value>linux121,linux122,linux123:2181</value>
</property>
新增
<property>
<name>hbase.regionserver.wal.codec</name>
<value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
<property>
<name>hbase.table.sanity.checks</name>
<value>false</value>
<description>Disables sanity checks on HBase tables.</description>
</property>
stop-hbase.sh
start-hbase.sh
重啟
stop-hbase.sh
./zk.sh stop
mr-jobhistory-daemon.sh stop historyserver
stop-yarn.sh
stop-dfs.sh
start-dfs.sh
start-yarn.sh
mr-jobhistory-daemon.sh start historyserver
./zk.sh start
start-hbase.sh
簡單的:
hbase clean --cleanAll
stop-hbase.sh
start-hbase.sh
測試:
cd /opt/lagou/servers/phoenix-hbase-1.3-4.16.1-bin/bin
./sqlline.py linux121:2181
可能會內存不足
free -h
1.3.3 Phoenix創建業務表
--用戶表
DROP TABLE IF EXISTS "dim_account";
create table "dim_account" (
"id" varchar primary key,
"user"."sex" varchar,
"user"."age" varchar,
"user"."expectcity" varchar,
"user"."expectpositionname" varchar,
"user"."expectpositionnametype1" varchar,
"user"."expectpositionnametype2" varchar,
"user"."expectsalarys" varchar,
"user"."highesteducation" varchar,
"user"."latest_schoolname" varchar,
"user"."_c10" varchar,
"user"."latest_companyname" varchar,
"user"."is_famous_enterprise" varchar,
"user"."work_year" varchar,
"user"."status" varchar) column_encoded_bytes=0;
--公司表
DROP TABLE IF EXISTS "dim_company";
create table "dim_company" (
"cid" varchar primary key,
"cy"."companyname" varchar,
"cy"."is_famous_enterprise" varchar,
"cy"."financestage" varchar,
"cy"."city" varchar,
"cy"."companysize" varchar,
"cy"."industryfield" varchar) column_encoded_bytes=0;
-- 職位表
DROP TABLE IF EXISTS "dim_position";
create table "dim_position" (
"id" varchar primary key,
"position"."positionname" varchar,
"position"."positionfirstcategory" varchar,
"position"."positionsecondcategory" varchar,
"position"."positionthirdcategory" varchar,
"position"."workyear" varchar,
"position"."education" varchar,
"position"."salarymin" varchar,
"position"."salarymax" varchar,
"position"."city" varchar,
"position"."companyid" varchar,
"position"."createtime" varchar,
"position"."lastupdatetime" varchar) column_encoded_bytes=0;
測試
SELECT * FROM "dim_position";
1.3.4 DataX安裝
上傳并解壓datax.tar.gz
tar -xvzf datax.tar.gz -C ../servers/
配置環境變量
vi /etc/profile
export DATAX_HOME="/opt/lagou/servers/datax"
export PATH=$PATH:${DATAX_HOME}/bin
source /etc/profile
1.3.5 DataX實現全量同步
方便直接查詢相應的字段變成json內容
SELECT GROUP_CONCAT('"' , COLUMN_NAME , '"' ORDER BY ORDINAL_POSITION SEPARATOR ',\n')
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'talents' AND TABLE_NAME = 'lg_account';
找Phoenix對應的寫入包
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"sex",
"age",
"expectcity",
"expectpositionname",
"expectpositionnametype1",
"expectpositionnametype2",
"expectsalarys",
"highesteducation",
"latest_schoolname",
"c10",
"latest_companyname",
"is_famous_enterprise",
"work_year",
"status"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop1:3306/talents"
],
"table": [
"lg_account"
]
}
],
"password": "123456",
"username": "root"
}
},
"writer": {
"name": "hbase11xsqlwriter",
"parameter": {
"batchSize": "256",
"column": [
"id",
"sex",
"age",
"expectcity",
"expectpositionname",
"expectpositionnametype1",
"expectpositionnametype2",
"expectsalarys",
"highesteducation",
"latest_schoolname",
"_c10",
"latest_companyname",
"is_famous_enterprise",
"work_year",
"status"
],
"hbaseConfig": {
"hbase.zookeeper.quorum": "hadoop4",
"zookeeper.znode.parent": "/hbase"
},
"nullMode": "skip",
"table": "dim_account"
}
}
}
],
"setting": {
"speed": {
"channel": "5"
}
}
}
}
--公司表
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"cid",
"companyname",
"is_famous_enterprise",
"financestage",
"city",
"companysize",
"industryfield"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://linux123:3306/talents"
],
"table": [
"lg_company"
]
}
],
"password": "123",
"username": "root"
}
},
"writer": {
"name": "hbase11xsqlwriter",
"parameter": {
"batchSize": "256",
"column": [
"cid",
"companyname",
"is_famous_enterprise",
"financestage",
"city",
"companysize",
"industryfield"
],
"hbaseConfig": {
"hbase.zookeeper.quorum": "linux122",
"zookeeper.znode.parent": "/hbase"
},
"nullMode": "skip",
"table": "dim_company"
}
}
}
],
"setting": {
"speed": {
"channel": "5"
}
}
}
}
--職位表
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"positionname",
"positionfirstcategory",
"positionsecondcategory",
"positionthirdcategory",
"workyear",
"education",
"salarymin",
"salarymax",
"city",
"companyid",
"createtime",
"lastupdatetime"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://linux123:3306/talents"
],
"table": [
"lg_position"
]
}
],
"password": "123",
"username": "root"
}
},
"writer": {
"name": "hbase11xsqlwriter",
"parameter": {
"batchSize": "256",
"column": [
"id",
"positionname",
"positionfirstcategory",
"positionsecondcategory",
"positionthirdcategory",
"workyear",
"education",
"salarymin",
"salarymax",
"city",
"companyid",
"createtime",
"lastupdatetime"
],
"hbaseConfig": {
"hbase.zookeeper.quorum": "linux122",
"zookeeper.znode.parent": "/hbase"
},
"nullMode": "skip",
"table": "dim_position"
}
}
}
],
"setting": {
"speed": {
"channel": "5"
}
}
}
}
測試
cd $DATAX_HOME/bin
vim $DATAX_HOME/job/mysql2phoenix_account.json
vim $DATAX_HOME/job/mysql2phoenix_company.json
vim po$DATAX_HOME/job/mysql2phoenix_position.json
python $DATAX_HOME/bin/datax.py $DATAX_HOME/job/mysql2phoenix_account.json
python $DATAX_HOME/bin/datax.py $DATAX_HOME/job/mysql2phoenix_company.json
python $DATAX_HOME/bin/datax.py $DATAX_HOME/job/mysql2phoenix_position.json
1.3.6 Kafka安裝
上傳kafka_2.12-1.0.2.tgz到服務器并解壓:
tar -xvzf kafka_2.12-1.0.2.tgz -C ../servers/
安裝包分發
cd /opt/lagou/servers/hadoop-2.7.3/etc/hadoop
./rsync-script /opt/lagou/servers/kafka_2.12-1.0.2
配置環境變量
vim /etc/profile
export KAFKA_HOME=/opt/lagou/servers/kafka_2.12-1.0.2
export PATH=$PATH:$KAFKA_HOME/bin
配置分發
./rsync-script /etc/profile
配置生效
source /etc/profile
修改linux121的配置文件
vim $KAFKA_HOME/config/server.properties
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://linux121:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=linux121:2181,linux122:2181,linux123:2181/myKafka
分發配置文件
./rsync-script $KAFKA_HOME/config/server.properties
修改linux122的配置文件
broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://linux122:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=linux121:2181,linux122:2181,linux123:2181/myKafka
修改linux123的配置文件
broker.id=2
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://linux123:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=linux121:2181,linux122:2181,linux123:2181/myKafka
啟動kafka集群,每臺機器都執行
kafka-server-start.sh $KAFKA_HOME/config/server.properties
測試
cd /opt/lagou/servers/zookeeper-3.4.14/bin
./zkCli.sh
# 查看每個Broker的信息
get /myKafka/brokers/ids/0
get /myKafka/brokers/ids/1
get /myKafka/brokers/ids/2
1.3.7 Maxwell安裝(Linux123)
上傳解壓maxwell-1.29.0.tar.gz
tar -xvzf maxwell-1.29.0.tar.gz -C ../servers/
cd ../servers/maxwell-1.29.0
編寫任務配置文件
vim driver.properties
######### binlog ###############
log_level=INFO
producer=kafka
host = linux123
user = maxwell
password = 123456
producer_ack_timeout = 600000
######### binlog ###############
######### output format stuff ###############
output_binlog_position=true
output_server_id=true
output_thread_id=true
output_commit_info=true
output_row_query=true
output_ddl=false
output_nulls=true
output_xoffset=true
output_schema_id=true
######### output format stuff ###############
############ kafka stuff #############
kafka.bootstrap.servers=linux121:9092,linux122:9092,linux123:9092
kafka_topic=mysql_incre
kafka_partition_hash=murmur3
kafka_key_format=hash
kafka.retries=5
kafka.acks=all
producer_partition_by=primary_key
############ kafka stuff #############
############## misc stuff ###########
bootstrapper=async
############## filter ###############
filter=exclude:*.*, include:talents.*
新增maxwell用戶
mysql -uroot -p123
INSTALL PLUGIN validate_password SONAME 'validate_password.so';
set global validate_password_policy=LOW;
set global validate_password_length=4;
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
CREATE USER 'maxwell'@'linux123' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT ALL ON maxwell.* TO 'maxwell'@'linux123';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'linux123';
flush privileges;
啟動consumer
kafka-console-consumer.sh --bootstrap-server linux121:9092,linux122:9092,linux123:9092 --topic mysql_incre
啟動maxwell
cd /opt/lagou/servers/maxwell-1.29.0
bin/maxwell --config driver.properties
nohup bin/maxwell --daemon --config driver.properties 2>&1 >> maxwell.log &
重啟會出現日志不一致,直接刪除maxwell數據庫即可
1.3.8 Maxwell實現增量同步
Windows下Scala環境配置
下載scala-2.12.20.msi
windows上安裝即可
idea配置scala






Flink程序,實現從kafak消費數據,寫入Hbase
先看能不能從Kafka拿到數據
/** *
* 1 使用flink消費kafka中mysql_incre主題的數據
* 2 解析對應的操作,同步數據到hbase指定表中
* kafka中消息格式如下:
* {"database":"talents","table":"lg_account","type":"update","ts":1612503687,"xid":5254102,
* "commit":true,"position":"mysql-bin.000001:125536870","server_id":1,"thread_id":1443,
* "schema_id":221,"data":{"id":556,"sex":"男","age":23,"expectcity":"北京","expectpositionname":"廣告協調","
* expectpositionnametype1":"市場|商務類","expectpositionnametype2":"媒介|公關","expectsalarys":"20k-40k","
* highesteducation":"本科","latest_schoolname":"北京工商大學","c10":"0","latest_companyname":"昌榮傳媒股份有限公司","
* is_famous_enterprise":"1","work_year":"10年","status":"離職找工作"},"old":{"age":33}}
*/
case class TableObject(database: String, tableName: String, typeInfo: String, dataInfo: String) extends Serializable
object SyncApp {
def main(args: Array[String]): Unit = {
// 獲取flink運行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//創建kafka消費者
val kafkaSource: FlinkKafkaConsumer[String] = new SourceKafka().getKafkaSource("mysql_incre")
val Stream = env.addSource(kafkaSource)
Stream.print()
//啟動
env.execute("mysql_data_incre_sync")
}
}
再解析拿到的數據
//解析maxwell傳遞的數據
val tableObjectStream: DataStream[TableObject] = Stream.map(msg => {
val jsonObject = JSON.parseObject(msg)
//獲取數據庫信息
val databaseName = jsonObject.get("database")
//獲取表信息
val tableName = jsonObject.get("table")
//獲取操作類型
val typeInfo = jsonObject.get("type")
//獲取到最新數據
val newData = jsonObject.get("data")
TableObject(databaseName.toString, tableName.toString, typeInfo.toString, newData.toString);
})
tableObjectStream.print()
再寫入數據到hbase
見lagou_deliver代碼
驗證
select * from "dim_account" where "id" = '30';
1.4 安裝Hive,安裝CH,對用戶行為數據進行實時OLAP分析

1.4.1 Hive安裝(Linux122)
下載mysql-connector-java-5.1.46.jar
https://downloads.mysql.com/archives/c-j/
下載解壓apache-hive-2.3.7-bin.tar.gz
http://archive.apache.org/dist/hive/
cd /opt/lagou/software
tar zxvf apache-hive-2.3.7-bin.tar.gz -C ../servers/
cd ../servers
mv apache-hive-2.3.7-bin hive-2.3.7
vi /etc/profile
export HIVE_HOME=/opt/lagou/servers/hive-2.3.7
export PATH=$PATH:$HIVE_HOME/bin
# 執行并生效
source /etc/profile
cd $HIVE_HOME/conf
cp hive-default.xml.template hive-site.xml
vi hive-site.xml
新增和修改
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- hive元數據的存儲位置 -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://linux123:3306/hivemetadata?createDatabaseIfNotExist=true&useSSL=false</value>
<description>JDBC connect string for a JDBC
metastore</description>
</property>
<!-- 指定驅動程序 -->
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC
metastore</description>
</property>
<!-- 連接數據庫的用戶名 -->
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
<description>username to use against metastore
database</description>
</property>
<!-- 連接數據庫的口令 -->
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>12345678</value>
<description>password to use against metastore
database</description>
</property>
</configuration>
修改
記得去掉 配置文件中的system:,且不用在配置的value里面留空格
cd /opt/lagou/software
cp mysql-connector-java-5.1.46.jar $HIVE_HOME/lib/
在mysql中執行
-- 創建用戶設置口令、授權、刷新
CREATE USER 'hive'@'%' IDENTIFIED BY '12345678';
GRANT ALL ON *.* TO 'hive'@'%';
FLUSH PRIVILEGES;
schematool -dbType mysql -initSchema
hive
show functions;
1.4.2 業務數據導入hive
create database ods;
use ods;
CREATE EXTERNAL TABLE ods.`ods_company`(
`cid` STRING,
`companyname` string,
`is_famous_enterprise` string,
`financestage` string,
`city` string,
`companysize` string,
`industryfield` string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES
("hbase.columns.mapping"
=":key,cy:companyname,cy:is_famous_enterprise,cy:financestage,cy:city,cy:company
size,cy:industryfield")
TBLPROPERTIES("hbase.table.name" = "dim_company");
CREATE EXTERNAL TABLE ods.`ods_account1`(
`id` String,
`sex` string,
`age` String,
`expectcity` string,
`expectpositionname` string,
`expectpositionnametype1` string,
`expectpositionnametype2` string,
`expectsalarys` string,
`highesteducation` string,
`latest_schoolname` string,
`_c10` string,
`latest_companyname` string,
`is_famous_enterprise` string,
`work_year` string,
`status` string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES
("hbase.columns.mapping"
=":key,user:sex,user:age,user:expectcity,user:expectpositionname,user:expectposi
tionnametype1,user:expectpositionnametype2,user:expectsalarys,user:highesteducat
ion,user:latest_schoolname,user:_c10,user:latest_companyname,user:is_famous_ente
rprise,user:work_year,user:status")
TBLPROPERTIES("hbase.table.name" = "dim_account");
CREATE EXTERNAL TABLE ods.`ods_position`(
`id` string,
`positionname` string,
`positionfirstcategory` string,
`positionsecondcategory` string,
`positionthirdcategory` string,
`workyear` string,
`education` string,
`salarymin` STRING,
`salarymax` STRING,
`city` string,
`companyid` STRING,
`createtime` string,
`lastupdatetime` string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES
("hbase.columns.mapping"
=":key,position:positionname,position:positionfirstcategory,position:positionsec
ondcategory,position:positionthirdcategory,position:workyear,position:education,
position:salarymin,position:salarymax,position:city,position:companyid,position:
createtime,position:lastupdatetime")
TBLPROPERTIES("hbase.table.name" = "dim_position");
1.4.2 CH安裝(Linux122)
下載v23.12.1.1368-stable
上傳4個文件到/opt/lagou/software/clickhouse_rpm
安裝
rpm -ivh ./*.rpm
vi /etc/clickhouse-server/config.xml
新增
<listent_host>0.0.0.0</listen_host>

啟動
sudo -u clickhouse clickhouse-server --config-file=/etc/clickhouse-server/config.xml
連接
clickhouse-client -m
1.4.3 實時ETL
根據用戶id,職位id,公司id到hbase中查詢對應的信息,
插入數據到clickhouse中
見lagou_deliver代碼
mvn install:install -file -DgroupId=com.clickhouse -DartifactId=clickhouse-jdbc -Dversion=0.6.3 -Dpackaging=jar -Dfile=E:\clickhouse-jdbc-0.6.3.jar
CREATE DATABASE IF NOT EXISTS lg_deliver_detail;
drop table lg_deliver_detail.deliver_detail;
CREATE TABLE lg_deliver_detail.deliver_detail(
user_id UInt64,
work_year String,
expectpositionname String,
positionid UInt64,
positionname String,
positionfirstcategory String,
positionsecondcategory String,
companyid UInt64,
companyname String,
highesteducation String,
company_city String,
is_famous_enterprise Int8,
companysize String,
expectsalarys String,
expectcity String,
education String,
gender String,
city String,
workyear String,
status String,
dt String
) ENGINE = MergeTree()
PARTITION BY dt
ORDER BY user_id
SETTINGS index_granularity = 8192;
1.4.4 SuperSet安裝
安裝Python環境
mkdir /opt/soft
curl -O https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
回車之后,一直按空格,提示Please answer ‘yes’ or ‘no’:’ 輸入yes。
指定安裝路徑/opt/soft/conda,回車默認
>>>/opt/soft/conda
PREFIX=/opt/soft/conda
初始化conda3,輸入yes
Do you wish the installer to initialize Miniconda3
[no] >>> yes
3) 配置系統環境變量
vim /etc/profile
export CONDA_HOME=/opt/soft/conda
export PATH=$PATH:$CONDA_HOME/bin
source /etc/profile
source ~/.bashrc
可以發現前面多了(base),python版本是3.11
取消激活base環境
conda config --set auto_activate_base false
bash
查看conda版本
復制成功
conda --version
conda 24.3.0
配置conda國內鏡像
復制成功
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2/
conda config --set show_channel_urls yes
conda config --show channels
7)創建python3.9環境
conda create --name superset python=3.9
y
8)激活superset環境
conda activate superset
若要退出環境使用以下命令:
conda deactivate
Superset部署
1)安裝準備依賴
復制成功
sudo yum install -y gcc gcc-c++ libffi-devel python-devel python-pip python-wheel python-setuptools openssl-devel cyrus-sasl-devel openldap-devel
2)安裝setuptools和pip
pip install --upgrade setuptools pip
3)安裝supetest
pip install apache-superset --trusted-host https://repo.huaweicloud.com -i https://repo.huaweicloud.com/repository/pypi/simple
4)初始化數據庫
superset db upgrade
遇到密匙安全性弱的報錯
pip show apache-superset
進入superset安裝路徑
生成paste_your_generated_secret_key_her
openssl rand -base64 42
vi superset_config.py
SECRET_KEY = 'paste_your_generated_secret_key_here'
SECRET_KEY = 'ocuiR5/s93tYYrIjuGhMFkWrM00tt7Kd3lt2tJ07rAnxgp+cg4jKFmHF'
vi /etc/profile
export SUPERSET_CONFIG_PATH=/opt/soft/conda/envs/superset/superset_config.py
source /etc/profile
5)創建管理員用戶
export FLASK_APP=superset
superset fab create-admin
Username [admin]:
User first name [admin]:
User last name [user]:
Email [admin@fab.org]:
Password:
Repeat for confirmation:
Recognized Database Authentications.
root 12345678
6)Superset初始化
superset init
7)安裝gunicorn
pip install gunicorn -i https://pypi.douban.com/simple/
8)啟動superset
superset run -h linux122 -p 8080 --with-threads --reload --debugger
gunicorn --workers 5 --timeout 120 --bind [ip]:[port] "superset.app:create_app()" --daemon
若要停止superset使用以下命令:
ps -ef | awk '/superset/ && !/awk/{print $2}' | xargs kill -9
9)登錄 Superset
linux122:8080
用戶名:root
密碼:12345678
訪問 ip:[port],并使用前面創建的管理員賬號進行登錄。
連接數據庫:
先安裝環境:
conda activate superset
yum install python-devel -y
pip install gevent
sudo yum install groupinstall 'development tools'
yum install mysql-devel -y
yum install gcc -y
pip install mysqlclient
報錯

pip install mysqlclient==1.4.4
測試

mysql://root:123@linux123/superset_demo?charset=utf8
點擊 SQLLab > SQL Editor編寫以下SQL語句
選擇 數據庫

select case when gender = 0 then '男' when gender = 1 then '女' else '保密' end as
gender, count(id) as total_count from t_user group by gender;
保存查詢
點擊 saved queries

運行查詢,點擊expolore瀏覽數據

配置圖表類型為 Bar Chart 條形圖

指定統計指標 sum(total_count)
指定序列為 gender(性別)
1.4.5 Superset展示ClickHouse數據
安裝驅動
見官網文檔:
Connecting to Databases | Superset
pip install clickhouse-connect
點擊database,新增ch連接
clickhousedb://default:click@linux122:8123/default
clickhousedb://linux122:8123/default
新增圖表
期望城市分組投遞次數
select expectcity, count(1) total_cnt from lg_deliver_detail.deliver_detail group by
expectcity;
期望城市分組用戶數
select expectcity, count(distinct(user_id)) total_user_cnt from
deliver_detail group by user_id,expectcity;
職位所在地分組統計職位數
select count(distinct(positionid)) total_jobs, city from deliver_detail
group by city
將之前設計好的圖標整合到看板中
操作步驟:
1、點擊 Dashboards > 添加看板
2、拖動之前開發好的 Charts 到看板中
浙公網安備 33010602011771號