pyspark建模(類似于dwd層),flask直接對接前端請求進(jìn)行召回(類似于ads層,但是不保存)
2. Spark MLib
2.1 Spark MLib 開發(fā)環(huán)境準(zhǔn)備
2.1.1 配置python和spark環(huán)境
安裝Python環(huán)境
安裝Anaconda3-5.2.0-Windows-x86_64.exe
配置環(huán)境變量
Anaconda_HOME
E:\20241014_Soft\Anaconda3
PATH
%Anaconda_HOME%Scripts;%Anaconda_HOME%Library\mingw-w64\bin;%Anaconda_HOME%Library\usr\bin;%Anaconda_HOME%Library\bin
打開AnacondaPromt
conda --version
安裝spark環(huán)境
Windows下配置Spark運(yùn)行環(huán)境及環(huán)境變量
spark-2.4.5-bin-hadoop2.7.tgz
解壓spark的安裝包到磁盤目錄
D:\Code\Soft\spark\spark-2.4.5-bin-hadoop2.7
在環(huán)境變量中配置SPARK_HOME指定解壓的路徑
SPARK_HOME
D:\Code\Soft\spark\spark-2.4.5-bin-hadoop2.7
將解壓的spark安裝包中的
D:\Code\Soft\spark\spark-2.4.5-bin-hadoop2.7\python\lib
復(fù)制到anaconda對應(yīng)的目錄下
E:\20241014_Soft\Anaconda3\Lib\site-packages

Step3:驗(yàn)證py4j是否安裝成功,進(jìn)入python環(huán)境,輸入import py4j

Step5:使用import導(dǎo)入pyspark模塊,如果沒錯即安裝成功。import pyspark

2.1.2 idea安裝python插件



新建一個python項(xiàng)目,pyspark_test
統(tǒng)計每個職位投遞總次數(shù) & 投遞總?cè)藬?shù)
統(tǒng)計指定地區(qū)的投遞的總?cè)藬?shù) & 總次數(shù)
統(tǒng)計每個地區(qū)投遞次數(shù)最多職位topN
見pyspark_test代碼
新建一個python項(xiàng)目,sparkmlib
線性回歸模型(連續(xù)變量)
邏輯回歸模型(分類變量)
決策樹模型(分類變量)
隨機(jī)森林模型(分類變量)
見sparkmlib代碼
2.1.3 安裝spark 集群
下載并安裝
spark-2.4.5-bin-hadoop2.7.tgz
cd /opt/lagou/software/
tar zxvf spark-2.4.5-bin-hadoop2.7.tgz
sudo chown -R root:root spark-2.4.5-bin-hadoop2.7
sudo chmod -R 755 spark-2.4.5-bin-hadoop2.7
mv spark-2.4.5-bin-hadoop2.7 ../servers/spark-2.4.5
配置
vi /etc/profile
export SPARK_HOME=/opt/lagou/servers/spark-2.4.5
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
source /etc/profile
文件位置:
cd $SPARK_HOME/conf
修改文件:slaves、spark-defaults.conf、spark-env.sh、log4j.properties
cp log4j.properties.template log4j.properties
cp slaves.template slaves
cp spark-defaults.conf.template spark-defaults.conf
cp spark-env.sh.template spark-env.sh
vi slaves
linux121
linux122
linux123
vi spark-defaults.conf
spark.master spark://linux121:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://linux121:9000/spark-eventlog
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 512m
spark.yarn.jars hdfs:///spark-yarn/jars/*.jar
修改spark-env.sh
vi spark-env.sh
export JAVA_HOME=/opt/lagou/servers/jdk1.8.0_421
export HADOOP_HOME=/opt/lagou/servers/hadoop-2.7.3
export HADOOP_CONF_DIR=/opt/lagou/servers/hadoop-2.7.3/etc/hadoop
export SPARK_DIST_CLASSPATH=$(/opt/lagou/servers/hadoop-2.7.3/bin/hadoop classpath)
export SPARK_MASTER_HOST=linux121
export SPARK_MASTER_PORT=7077
vi $HADOOP_HOME/etc/hadoop/yarn-site.xml
新增
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
# 配置spark-sql讀取hive的元數(shù)據(jù)
##將hive-site.xml 軟連接到spark的conf配置目錄中:
cd $SPARK_HOME/conf
ln -s $HIVE_HOME/conf/hive-site.xml hive-site.xml
vi hive-site.xml
修改
<property>
<name>hive.metastore.uris</name>
<value>thrift://linux122:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.
</description>
</property>
scp -r hive-site.xml linux121:/opt/lagou/servers/spark-2.4.5/conf
scp -r hive-site.xml linux123:/opt/lagou/servers/spark-2.4.5/conf
##將連接 mysql-connector-java-5.1.35-bin.jar拷貝到spark的jars目錄下
cp $HIVE_HOME/lib/mysql-connector-java-5.1.46.jar $SPARK_HOME/jars
將Spark軟件分發(fā)到集群;修改其他節(jié)點(diǎn)上的環(huán)境變量
cd /opt/lagou/servers/
scp -r spark-2.4.5/ linux122:$PWD
scp -r spark-2.4.5/ linux121:$PWD
scp -r spark-2.4.5/ linux123:$PWD
source /etc/profile
注意:使用pyspark讀取Hive 外部表(Hive 映射Hbase),需要額外準(zhǔn)備Hbase,Hive相關(guān)Jar包到Spark。
cp $HBASE_HOME/lib/hbase-*.jar $SPARK_HOME/jars/
cp $HIVE_HOME/lib/hive-*.jar $SPARK_HOME/jars/
# 將 $SPARK_HOME/jars 下的jar包上傳到hdfs
創(chuàng)建 HDFS 目錄:
hdfs dfs -rm -r /spark-eventlog
hdfs dfs -rm -r /spark-yarn
hdfs dfs -mkdir /spark-eventlog
hdfs dfs -mkdir -p /spark-yarn/jars/
cd $SPARK_HOME/jars
hdfs dfs -put * /spark-yarn/jars/
額外補(bǔ)充:(linux122),重要(ps:因?yàn)橐恍┌姹酒ヅ鋯栴},所以有可能跑不通,需要先嘗試跑通hbase,再嘗試跑通spark_sql,唯一的辦法是提前確定版本,找jar包解決不了問題)
rm -rf $SPARK_HOME/jars/metrics-core-4.1.1.jar
scp $HBASE_HOME/lib/metrics-core-4.1.1.jar linux122:$SPARK_HOME/jars
scp $HBASE_HOME/lib/metrics-core-2.1.3.jar linux122:$HBASE_HOME/lib/
hdfs dfs -rm -r /spark-yarn/jars/metrics-core-2.1.3.jar
rm -rf $SPARK_HOME/jars/lz4-java-1.4.0.jar
scp lz4-java-1.8.1.jar linux122:$SPARK_HOME/jars/
rm -rf $SPARK_HOME/jars/metrics-core-2.2.0.jar
hdfs dfs -rm -r /spark-yarn/jars/lz4-java-1.8.1.jar
cd $SPARK_HOME/jars/
rm -rf parquet-hadoop-bundle-1.6.0.jar
hdfs dfs -rm -r /spark-yarn/jars/lz4-java-1.4.0.jar
hdfs dfs -put lz4-java-1.4.1.jar /spark-yarn/jars/
hdfs dfs -put $HBASE_HOME/lib/metrics-core-4.1.1.jar /spark-yarn/jars/
scp $SPARK_HOME/jars/*.jar linux121:$SPARK_HOME/jars/
scp $SPARK_HOME/jars/*.jar linux123:$SPARK_HOME/jars/
rm -rf $SPARK_HOME/jars/lz4-java-1.4.1.jar
hdfs dfs -rm -r /spark-yarn/jars/lz4-java-1.4.1.jar
hdfs dfs -get /spark-yarn/jars/*
啟動(前提:Hadoop的 HDFS、Yarn、HistoryServer 正常;Spark historyserver服務(wù)正常;)
scp lz4-java-1.8.0.jar linux122:$SPARK_HOME/jars/
hdfs dfs -put lz4-java-1.8.0.jar /spark-yarn/jars/
cd $SPARK_HOME/sbin
./stop-all.sh
./start-all.sh
#linux122
ps aux | grep metastore
hive --service metastore &
#linux121
spark-sql --master yarn
spark-shell
測試
http://192.168.49.121:8080/
http://192.168.49.121:18080/
2.1.4 激活python3環(huán)境,并且啟動jupyter notebook
# 創(chuàng)建一個名為 'spark-env' 的新環(huán)境,使用 Python 3.7
conda create -n spark-env python=3.7
# 激活新環(huán)境
conda activate spark-env
安裝
conda install py4j
conda install jieba
conda install pyspark==2.4.5
conda install pyhive
conda install happybase==1.2.0
conda uninstall jupyter
# 或者安裝 Jupyter Notebook 6.0.3
conda install notebook=6.0.3
cd /opt/soft
mkdir -p /opt/soft/conda
chmod 777 /opt/soft/conda
scp -r conda/ linux121:/opt/soft
scp -r conda/ linux123:/opt/soft
/opt/soft/conda
修改其它節(jié)點(diǎn)上的環(huán)境變量
新增
vim /etc/profile
export CONDA_HOME=/opt/soft/conda
export PATH=$PATH:$CONDA_HOME/bin
source /etc/profile
新建項(xiàng)目目錄
mkdir -p /root/data/code/job_recommended/
chmod 777 /root/data/code/job_recommended/
cd /root/data/code/job_recommended/
啟動
在項(xiàng)目目錄開啟
# export TZ=Asia/Shanghai
# export LANG=en_US.UTF-8
jupyter notebook --port=8889 --ip=0.0.0.0 --no-browser --allow-root
use ods;
show tables;
desc ods_position;
導(dǎo)入sql文件
hive -f /root/data/user_action.sql
進(jìn)入jupyter notebook,新建一個concat_fields.ipynb
代碼見/root/data/code/job_recommended/concat_fields.ipynb
python3位置:/opt/soft/conda/envs/superset/bin
在hive中新建相應(yīng)的表,將結(jié)果插入hive中的表
drop table `ods.ods_position_content`;
CREATE TABLE `ods.ods_position_content`(
`id` string,
`region` string,
`position_category` string,
`content` string)
row format delimited fields terminated by ',';
2.1.4 TFIDF
新建目錄,將文件放入
sudo mkdir -p /data/words
ITKeywords.txt stopwords.txt
新建文件夾
hdfs dfs -rm -r /lgns/lg_models/
hdfs dfs -mkdir -p /lgns/lg_models/
在hive中新建相應(yīng)的表
drop table `idf_keywords_values`;
CREATE TABLE idf_keywords_values(
keyword STRING comment "keyword",
idf DOUBLE comment "idf",
index INT comment "index");
-- 職位tfidf保存
CREATE TABLE tfidf_keywords_values(
position_id INT comment "position_id",
region string comment "region",
keyword STRING comment "keyword",
tfidf DOUBLE comment "tfidf");
新建文件compute_tfidf.ipynb
見compute_tfidf.ipynb
2.1.5 TextRank
創(chuàng)建textrank_keywords_values表
drop table if exists textrank_keywords_values ;
CREATE TABLE textrank_keywords_values(
position_id INT comment "position_id",
region String comment "region",
industry String comment "industry",
keyword STRING comment "keyword",
textrank DOUBLE comment "textrank");
新建文件compute_textrank.ipynb
見compute_textrank.ipynb代碼
新建表
create table position_profile(
position_id String,
region String,
keywords MAP<String,String>,
topics ARRAY<String>
);
drop table if exists position_vector;
CREATE TABLE position_vector(
position_id String comment "position_id",
region String comment "region",
position_vector ARRAY<double> comment "keyword")
row format delimited fields terminated by "/t" collection items terminated by
',';
新建文件word2vec.ipynb文件,計算職位畫像結(jié)果和職位相似度
見word2vec.ipynb代碼
hbase新建表
disable 'position_similar'
drop 'position_similar'
create 'position_similar', 'similar'
# 存儲格式如下:key:為position_id, 'similar:position_id', 結(jié)果為相似度
put 'position_similar', '1', 'similar:2', 0.34
put 'position_similar', '1', 'similar:3', 0.267
put 'position_similar', '1', 'similar:4', 0.56
put 'position_similar', '1', 'similar:5', 0.7
put 'position_similar', '1', 'similar:6', 0.819
put 'position_similar', '1', 'similar:8', 0.28
hbase thrift start -p 9090
,之后才能連接hbase
2.1.8 用戶畫像構(gòu)建
hbase新建表
create 'user_profile', 'basic','user_reference','env'
新建user_profile
見user_profile代碼
代碼可能會報hive連不到hbase的錯誤,導(dǎo)包到
cd $HIVE_HOME/lib/
上傳 hive-hbase-handler-2.3.7.jar
新建hive表
drop table if exists user_profile_hbase;
create external table user_profile_hbase(
user_id STRING comment "userID",
basic map<string, String> comment "user basic information",
user_reference map<string, String> comment "user_reference",
env map<string, String> comment "user env")
COMMENT "user profile table"
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,basic:,user_reference:,env:")
TBLPROPERTIES ("hbase.table.name" = "user_profile");
這里因?yàn)榘姹静黄ヅ洌詿o法推進(jìn),為了節(jié)省時間,直接先略過
停止全部spark的命令
yarn application -list | grep -i spark | awk '{print $1}' | xargs -I {} yarn application -kill {}
2.1.9 召回與排序
新建hbase表
create 'lg_recall', {NAME=>'als', TTL=>1296000, VERSIONS=>999999}
alter 'lg_recall', {NAME=>'content', TTL=>1296000, VERSIONS=>999999}
alter 'lg_recall', {NAME=>'online', TTL=>1296000, VERSIONS=>999999}
# 例子:
put 'lg_recall', 'recall:user:5', 'als:1',[45,3,5,10]
put 'lg_recall', 'recall:user:5', 'als:1',[289,11,65,52,109,8]
put 'lg_recall', 'recall:user:5', 'als:2',[1,2,3,4,5,6,7,8,9,10]
put 'lg_recall', 'recall:user:2', 'content:1',[45,3,5,10,289,11,65,52,109,8]
put 'lg_recall', 'recall:user:2', 'content:2',[1,2,3,4,5,6,7,8,9,10]
create 'history_recall', {NAME=>'recall', TTL=>3888000, VERSIONS=>999999}
put 'history_recall', 'userid1', 'recall:history',[1,2,3]
put 'history_recall', 'userid1', 'recall:history',[4,5,6,7]
put 'history_recall', 'userid1', 'recall:history',[8,9,10]
新建AlsRecall文件,按用戶召回
見AlsRecall代碼
新建LRRank文件,按內(nèi)容召回
見LRRank代碼
2.1.10 推薦流程
windows環(huán)境
# 創(chuàng)建一個名為 'spark-env' 的新環(huán)境,使用 Python 3.7
conda create -n spark-env python=3.7
# 激活新環(huán)境
conda activate spark-env
conda install grpcio-tools
conda install grpcio
conda install pyspark==2.4.5
conda install happybase==1.2.0
conda install redis
# 首先切換到 E: 驅(qū)動器
E:
# 然后進(jìn)入目標(biāo)目錄
cd mysource\pyspark_test\com\abtest
編譯生成代碼
python -m grpc_tools.protoc -I. --python_out=.. --grpc_python_out=reco.proto
新建hbase表
create 'ctr_user_feature', 'user_weigths'
create 'ctr_position_feature', 'position_weigths'
新建feature_process
見feature_process代碼
浙公網(wǎng)安備 33010602011771號