<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      pyFlink 入門總結

      一 整體流程

      1. 初始化pyFlink執行環境
      2. 加載數據集
      3. 執行數據分析
      4. 導出分析結果
       
      二 初始化執行環境

      2.1 初始化

      參考代碼如下
      from pyflink.table import EnvironmentSettings, StreamTableEnvironment
      es = EnvironmentSettings.new_instance().in_batch_mode().build()
      tv = StreamTableEnvironment.create(environment_settings=es)

      2.2 其它

      待補充其它初始化方法,如流處理等

      三 加載數據集

      3.1 基于變量

      參考代碼如下
      data = [['T1', 34, 'XY'],['T2', 34, 'NY'],['T3', 33, 'XX'],['T4', 33, 'JZ'],['T5', 33, 'SZ'],['T6', 33, 'PDS'],['T7', 32, 'XC'],['T8', 32, 'NY']]
      tbl = tv.from_elements(data, ['name','age','city'],['STRING','INT','STRING'])
      tv.create_temporary_view('itable', tbl)  # 注冊為flinksql能訪問的對向

      3.2 基于pandas.DataFrame

      dfa = pd.DataFrame(data, columns='name age city'.split())
      tbl = tv.from_pandas(dfa) 

      3.3 基于csv

      csv_path = 'iexample.csv'
      csv_schema = 'name string, age int, city string'
      csv_sql = F"create table iTable({csv_schema}) with ('connector' = 'filesystem', 'path' = '{csv_path}', 'format' = 'csv'))"
      tv.execute_sql(csv_sql)
      tbl = tv.from_path('iTable')
      經驗1 包含表頭的csv會報錯
      經驗2 csv_sql 后面的with 中的引號必須是 單引號 雙引號會報錯
      經驗3 不要創建重名的表,會報錯 此條有待進一步確認
       

      3.4 連接postgresql

      from pyflink.table import EnvironmentSettings, StreamTableEnvironment
      es = EnvironmentSettings.new_instance().in_batch_mode().build()
      tv = StreamTableEnvironment.create(environment_settings=es)
       
      pg_schema = 'name STRING, age INT, city string'
      dsn = F'jdbc:postgresql://{host}:{port}/{database}'
      pg_sql = F"create table pg_table ({pg_schema})with ('connector'='jdbc','url'='{dsn}','table-name'='{table_name}','driver'='org.postgresql.Driver','username'='{user}','password'='{pwd}')"
      print(pg_sql)
      tv.execute_sql(pg_sql)
      tbl = tv.from_path('pg_table')
      tbl.limit(5).execute().print()
      經驗4 需要下載flink-connector-jdbc-*.jar文件 和 postgresql-*.jar文件 對應目錄是pyflink安裝目錄的 lib文件夾下
      經驗5 連接依賴文件與數據庫版本也有關系

      四 數據處理

      4.1 簡單處理

      1) select

      from pyflink.table.expressions import col, call
      tt = tbl.select(col("city"))
      tt.limit(3).execute().print()

      2)group_by

      tbl.group_by(col('city')).select(col('city'),call("count", col('city')).alias('cnt')).execute().print()
      tv.register_table('itable', tbl)
      tv.sql_query('select city, count(*)cnt from itable group by city').execute().print()

      3)order_by

      tbl.order_by(col('age').desc).execute().print()

      4)buildin function

      tbl.select(call('avg',col('age')).alias('age_avg')).execute().print()
      tbl.select(call('sum',col('age')).alias('age_sum')).execute().print()

      5)normalized

      
      
      @udf(result_type='Row<_name string, _age float>', func_type='pandas')
      def inormal(data: pd.DataFrame) -> pd.DataFrame:
          data['_age'] = (data['age']-data['age'].mean())/data['age'].std()
          return data[['name', '_age']]
      tbl.map(inormal).execute().print()
      
      
      

      6)map & udf

      
      
      @udf(result_type='Row<_name string, province string>', func_type='pandas')
      def itown(data: pd.DataFrame) -> pd.DataFrame:
          data['province'] = data.city.apply(lambda x: dct.get(x))
          return data[['name', 'province']]
      tbl.map(itown).execute().print()
      
      
      

      4.2 其它處理

      待補充

      五 輸出

      5.1 print

      tbl.map(itown).execute().print()

      5.2 CSV

      # 定義輸出 CSV 文件的 schema
      sink_schema = "name STRING, age int, _age float, city string, province string"
      # 定義輸出 CSV 文件的目錄
      sink_path = "tmpfile"
      # 注冊輸出表
      tv.execute_sql(f"""
          CREATE TABLE sink_table (
              {sink_schema}
          ) WITH (
              'connector' = 'filesystem',
              'path' = '{sink_path}',
              'format' = 'csv'
          )
      """)
      
      _age = tbl.map(inormal)
      _town = tbl.map(itown)
      t1 = tbl.join(_age).where(col('name')==col('_name')).select(col('name'), col('age'), col('_age'), col('city'))
      t2 = t1.join(_town).where(col('name')==col('_name')).select(col('name'), col('age'), col('_age'), col('city'), col('province'))
      tv.create_temporary_view('jtable', t2)  # old api tv.register_table
      
      # 執行查詢并將結果寫入輸出表
      sql = 'INSERT INTO sink_table(name, age, _age, city, province) SELECT name, age, _age, city, province FROM jtable'
      tv.execute_sql(sql).wait()
       

      經驗6 csv輸出路徑只能指定目錄 不能指定名稱

      經驗7 數據庫輸出需要提前創建供寫入的表

      5.3 POSTGRESQL

      
      
      pg_schema = "name STRING, age int, _age float, city string, province string"
      dsn = F'jdbc:postgresql://{host}:{port}/{database}'
      pg_sql = F"create table pg_table ({pg_schema})with ('connector'='jdbc','url'='{dsn}','table-name'='{table_name}','driver'='org.postgresql.Driver','username'='{user}','password'='{pwd}')"
      tv.execute_sql(pg_sql)  # create
      
      sql = 'INSERT INTO pg_table(name, age, _age, city, province) SELECT name, age, _age, city, province FROM jtable'
      tv.execute_sql(sql).wait() 




      posted @ 2024-05-13 10:14  ddzhen  閱讀(703)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲精品男男一区二区| 体态丰腴的微胖熟女的特征| 亚洲第一成年免费网站| 我要看亚洲黄色太黄一级黄| 天堂V亚洲国产V第一次| 99久久亚洲综合精品成人网| 伊吾县| 国产精品妇女一区二区三区 | 精品国产乱码久久久久久口爆网站| 激情国产一区二区三区四区| 性色欲情网站iwww九文堂| 久热综合在线亚洲精品| 国产精品∧v在线观看| av深夜免费在线观看| 色天使亚洲综合一区二区| 亚洲人成色99999在线观看| 视频二区中文字幕在线| 精品国产线拍大陆久久尤物| 精品少妇av蜜臀av| 少妇被粗大的猛烈进出69影院一| 亚洲爆乳WWW无码专区| 河津市| 美女又黄又免费的视频| 人妻18毛片A级毛片免费看| 日本高清一区免费中文视频| 97se亚洲综合自在线| 免费无码又爽又刺激高潮的app| 国产不卡av一区二区| 国产一级特黄高清大片一| 庆阳市| 亚洲真人无码永久在线| 国产极品精品自在线不卡| 国精品午夜福利视频| 久久中文字幕国产精品| 亚洲中文无码永久免费| 欧洲人与动牲交α欧美精品| 铁力市| 久久亚洲精品无码播放| 久久伊99综合婷婷久久伊| 九九热精品在线观看| yw尤物av无码国产在线观看|