pyFlink 入門總結
一 整體流程
1. 初始化pyFlink執行環境
2. 加載數據集
3. 執行數據分析
4. 導出分析結果
二 初始化執行環境
2.1 初始化
參考代碼如下
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
es = EnvironmentSettings.new_instance().in_batch_mode().build()
tv = StreamTableEnvironment.create(environment_settings=es)
2.2 其它
待補充其它初始化方法,如流處理等
三 加載數據集
3.1 基于變量
參考代碼如下
data = [['T1', 34, 'XY'],['T2', 34, 'NY'],['T3', 33, 'XX'],['T4', 33, 'JZ'],['T5', 33, 'SZ'],['T6', 33, 'PDS'],['T7', 32, 'XC'],['T8', 32, 'NY']]
tbl = tv.from_elements(data, ['name','age','city'],['STRING','INT','STRING'])
tv.create_temporary_view('itable', tbl) # 注冊為flinksql能訪問的對向
3.2 基于pandas.DataFrame
dfa = pd.DataFrame(data, columns='name age city'.split())
tbl = tv.from_pandas(dfa)
3.3 基于csv
csv_path = 'iexample.csv'
csv_schema = 'name string, age int, city string'
csv_sql = F"create table iTable({csv_schema}) with ('connector' = 'filesystem', 'path' = '{csv_path}', 'format' = 'csv'))"
tv.execute_sql(csv_sql)
tbl = tv.from_path('iTable')
經驗1 包含表頭的csv會報錯
經驗2 csv_sql 后面的with 中的引號必須是 單引號 雙引號會報錯
經驗3 不要創建重名的表,會報錯 此條有待進一步確認
3.4 連接postgresql
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
es = EnvironmentSettings.new_instance().in_batch_mode().build()
tv = StreamTableEnvironment.create(environment_settings=es)
pg_schema = 'name STRING, age INT, city string'
dsn = F'jdbc:postgresql://{host}:{port}/{database}'
pg_sql = F"create table pg_table ({pg_schema})with ('connector'='jdbc','url'='{dsn}','table-name'='{table_name}','driver'='org.postgresql.Driver','username'='{user}','password'='{pwd}')"
print(pg_sql)
tv.execute_sql(pg_sql)
tbl = tv.from_path('pg_table')
tbl.limit(5).execute().print()
經驗4 需要下載flink-connector-jdbc-*.jar文件 和 postgresql-*.jar文件 對應目錄是pyflink安裝目錄的 lib文件夾下
經驗5 連接依賴文件與數據庫版本也有關系
四 數據處理
4.1 簡單處理
1) select
from pyflink.table.expressions import col, call
tt = tbl.select(col("city"))
tt.limit(3).execute().print()
2)group_by
tbl.group_by(col('city')).select(col('city'),call("count", col('city')).alias('cnt')).execute().print()
tv.register_table('itable', tbl)
tv.sql_query('select city, count(*)cnt from itable group by city').execute().print()
3)order_by
tbl.order_by(col('age').desc).execute().print()
4)buildin function
tbl.select(call('avg',col('age')).alias('age_avg')).execute().print()
tbl.select(call('sum',col('age')).alias('age_sum')).execute().print()
5)normalized
@udf(result_type='Row<_name string, _age float>', func_type='pandas')
def inormal(data: pd.DataFrame) -> pd.DataFrame:
data['_age'] = (data['age']-data['age'].mean())/data['age'].std()
return data[['name', '_age']]
tbl.map(inormal).execute().print()
6)map & udf
@udf(result_type='Row<_name string, province string>', func_type='pandas')
def itown(data: pd.DataFrame) -> pd.DataFrame:
data['province'] = data.city.apply(lambda x: dct.get(x))
return data[['name', 'province']]
tbl.map(itown).execute().print()
4.2 其它處理
待補充
五 輸出
5.1 print
tbl.map(itown).execute().print()
5.2 CSV
# 定義輸出 CSV 文件的 schema
sink_schema = "name STRING, age int, _age float, city string, province string"
# 定義輸出 CSV 文件的目錄
sink_path = "tmpfile"
# 注冊輸出表
tv.execute_sql(f"""
CREATE TABLE sink_table (
{sink_schema}
) WITH (
'connector' = 'filesystem',
'path' = '{sink_path}',
'format' = 'csv'
)
""")
_age = tbl.map(inormal)
_town = tbl.map(itown)
t1 = tbl.join(_age).where(col('name')==col('_name')).select(col('name'), col('age'), col('_age'), col('city'))
t2 = t1.join(_town).where(col('name')==col('_name')).select(col('name'), col('age'), col('_age'), col('city'), col('province'))
tv.create_temporary_view('jtable', t2) # old api tv.register_table
# 執行查詢并將結果寫入輸出表
sql = 'INSERT INTO sink_table(name, age, _age, city, province) SELECT name, age, _age, city, province FROM jtable'
tv.execute_sql(sql).wait()
經驗6 csv輸出路徑只能指定目錄 不能指定名稱
經驗7 數據庫輸出需要提前創建供寫入的表
5.3 POSTGRESQL
pg_schema = "name STRING, age int, _age float, city string, province string"
dsn = F'jdbc:postgresql://{host}:{port}/{database}'
pg_sql = F"create table pg_table ({pg_schema})with ('connector'='jdbc','url'='{dsn}','table-name'='{table_name}','driver'='org.postgresql.Driver','username'='{user}','password'='{pwd}')"
tv.execute_sql(pg_sql) # create
sql = 'INSERT INTO pg_table(name, age, _age, city, province) SELECT name, age, _age, city, province FROM jtable'
tv.execute_sql(sql).wait()

浙公網安備 33010602011771號