<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      miketwais

      work up

      AWS redshift+glue+s3 數據ETL總結

      近期接觸一個從Azure 上遷移數倉到AWS的案例。一直想總結一下,在這個case中接觸到的工具以及方法。

      原來的架構

      目標架構:

      在AWS上形成了基于 s3+aws glue+redshift構建的數據體系,其中S3可用作數據湖存儲,Glue用作ETL/ELT工具,redshift是基于postgresql的列式存儲,作為數倉

      在這個技術棧上構建了一個demo,實現下面場景:

      從天氣API上獲取指定地區天氣信息,經過簡單數據處理(ETL),存儲到redshift上指定表中。

      步驟:

      1.準備資源并賦予相應IAM權限

        資源:S3,glue ,redshift,創建S3目錄:pocs3inputdev/weather_info/,

      創建redshift表:

      CREATE TABLE weatherdata (
          city          VARCHAR(256)   ENCODE ZSTD,
          province      VARCHAR(256)   ENCODE ZSTD,
          adcode        VARCHAR(64)    ENCODE ZSTD,
          timestamp     TIMESTAMP      ENCODE DELTA32K,
          weather       VARCHAR(256)   ENCODE ZSTD,
          temperature   VARCHAR(64)    ENCODE ZSTD,
          winddirection VARCHAR(128)   ENCODE ZSTD,
          windpower     VARCHAR(64)    ENCODE ZSTD,
          humidity      VARCHAR(64)    ENCODE ZSTD,
          reporttime    VARCHAR(256)   ENCODE ZSTD
      )
      DISTKEY(adcode)
      SORTKEY(timestamp);
      View Code

        ps:注意redshift的DDL格式,主要關注DISTKEY,SORTKEY,以及字段的加密算法ENCODE

        權限:glue中訪問S3 PUT /DELETE權限,glue中對S3的fullaccess權限,glue中對redshift的fullaccess權限。

      2.所有ETL的表都在glue的Meta catelog中管理,所以需要先在glue中建表

        創建數據庫dw-pbi-poc-glue-db

             通過“Add table”創建表s3_weather_info,手動設置字段,指定S3目錄:s3://pocs3inputdev/weather_info/

        (也可以通過“Add tables using crawler”,指定S3目錄,可以自動識別csv的head,并自動創建表)

      3.在glue中創建job, 組裝workflow 形成pipeline

        我們設計兩個job,job1使用python shell模式實現讀取API,獲取數據存儲到S3上(CSV)

        job2實現讀取S3上數據,經過ETL(簡單字段映射等),存儲到redshift指定表

        job1:

        glue-->ETL jobs-->script editor-->python

      import requests
      import pandas as pd
      import boto3
      from datetime import datetime
      import json
      
      def get_weather_data_and_save_to_s3():
          # API endpoint and parameters
          url = "https://restapi.amap.com/v3/weather/weatherInfo"
          params = {
              'key': 'd84fdf9cc1f19710bb7ff6c3cd924726',
              'city': '110102',
              'extensions': 'base',
              'output': 'JSON'
          }
          
          # Make HTTP GET request
          response = requests.get(url, params=params)
          
          if response.status_code == 200:
              data = response.json()
              
              # Process the weather data
              if data.get('status') == '1' and 'lives' in data:
                  weather_info = data['lives'][0]  # Get the first (and likely only) record
                  
                  # Create a DataFrame with the weather data
                  df = pd.DataFrame([{
                      'timestamp': datetime.now().isoformat(),
                      'province': weather_info.get('province', ''),
                      'city': weather_info.get('city', ''),
                      'adcode': weather_info.get('adcode', ''),
                      'weather': weather_info.get('weather', ''),
                      'temperature': weather_info.get('temperature', ''),
                      'winddirection': weather_info.get('winddirection', ''),
                      'windpower': weather_info.get('windpower', ''),
                      'humidity': weather_info.get('humidity', ''),
                      'reporttime': weather_info.get('reporttime', '')
                  }])
                  
                  # Save CSV to local C drive with UTF-8 BOM encoding for proper Chinese display
                  # local_file_path = f"C:/Users/a765902/Desktop/KGS 材料/spike poc/weather_data.csv"
                  
                  # # Use pandas to_csv with proper encoding
                  # df.to_csv(local_file_path, index=False, encoding='utf-8-sig')
                  # print(f"Successfully saved CSV to local drive: {local_file_path}")
                  csv_buffer=df.to_csv(index=False)
                  
                  try:
                  # 你的 boto3 調用
                  
                  # Upload to S3
                      s3_client = boto3.client('s3')
                      bucket_name = 'pocs3inputdev'  # Replace with your bucket name
                      #file_name = f"weather_data_{datetime.now().strftime('%Y%m%d')}.csv"
                      file_name = f"weather_info/weather_data.csv"
                      
                      # Upload the CSV data to S3
                      s3_client.put_object(
                          Bucket=bucket_name,
                          Key=file_name,
                          Body=csv_buffer,
                          ContentType='text/csv'
                      )
                      
                      print(f"Successfully saved weather data to s3://{bucket_name}/weather_info/{file_name}")
                      return f"s3://{bucket_name}/weather_info/{file_name}"
                  except Exception as e:
                      print(f"Error: {e}")
                      raise
              else:
                  print("Error: Invalid response data from weather API")
                  return None
          else:
              print(f"Error: HTTP {response.status_code} - {response.text}")
              return None
      
      # Execute the function
      if __name__ == "__main__":
          get_weather_data_and_save_to_s3()
      View Code

        job2:

        glue-->ETL jobs-->Visual ETL

        拖拉拽相應控件,最終腳本:

        

      import sys
      from awsglue.transforms import *
      from awsglue.utils import getResolvedOptions
      from pyspark.context import SparkContext
      from awsglue.context import GlueContext
      from awsglue.job import Job
      from awsglue import DynamicFrame
      
      args = getResolvedOptions(sys.argv, ['JOB_NAME'])
      sc = SparkContext()
      glueContext = GlueContext(sc)
      spark = glueContext.spark_session
      job = Job(glueContext)
      job.init(args['JOB_NAME'], args)
      
      # Script generated for node Amazon S3
      AmazonS3_node1754055290874 = glueContext.create_dynamic_frame.from_options(format_options={"quoteChar": "\"", "withHeader": True, "separator": ",", "optimizePerformance": False}, connection_type="s3", format="csv", connection_options={"paths": ["s3://pocs3inputdev/weather_data.csv"], "recurse": True}, transformation_ctx="AmazonS3_node1754055290874")
      
      # Script generated for node Rename Field
      RenameField_node1754055306801 = RenameField.apply(frame=AmazonS3_node1754055290874, old_name="winddirection", new_name="direction", transformation_ctx="RenameField_node1754055306801")
      
      # Script generated for node Amazon Redshift
      AmazonRedshift_node1754055312487 = glueContext.write_dynamic_frame.from_options(frame=RenameField_node1754055306801, connection_type="redshift", connection_options={"redshiftTmpDir": "s3://aws-glue-assets-455512573562-cn-northwest-1/temporary/", "useConnectionProperties": "true", "dbtable": "public.weatherdata", "connectionName": "Redshift connection", "preactions": "CREATE TABLE IF NOT EXISTS public.weatherdata (timestamp VARCHAR, province VARCHAR, city VARCHAR, adcode VARCHAR, weather VARCHAR, temperature VARCHAR, direction VARCHAR, windpower VARCHAR, humidity VARCHAR, reporttime VARCHAR);"}, transformation_ctx="AmazonRedshift_node1754055312487")
      
      job.commit()
      View Code

      創建workflow,通過trigger串聯job1,job2

      image

       trigger方式可以設置為按天,按周,按需,自定義等

       

      posted @ 2025-08-05 22:14  MasonZhang  閱讀(7)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 在线a亚洲老鸭窝天堂| 人妻聚色窝窝人体WWW一区 | 亚洲自拍偷拍福利小视频| 亚洲一区精品伊人久久| 亚洲一本二区偷拍精品| 91老肥熟女九色老女人| 好吊妞视频这里有精品| 国产福利酱国产一区二区| 福利一区二区1000| 久久久久久综合网天天| 亚洲欧美综合中文| 亚洲精品区二区三区蜜桃| 青青青青国产免费线在线观看 | 精品久久久久久无码不卡| 国产线播放免费人成视频播放| 免费观看成人毛片a片| 亚洲国产欧美在线人成AAAA| 亚洲 日韩 国产 制服 在线| 得荣县| 亚洲av无在线播放中文| a男人的天堂久久a毛片| 一区二区中文字幕视频| 偷拍一区二区三区在线视频| 在线天堂www在线| 精品粉嫩国产一区二区三区| 少妇高潮喷水正在播放 | 国产喷水1区2区3区咪咪爱AV| 在线免费播放av日韩| 亚洲国产精品日韩专区av| 少妇的丰满3中文字幕| 国产精品免费重口又黄又粗| 东京热一精品无码av| 久久超碰色中文字幕超清| 久人人爽人人爽人人片av| 国产成年码av片在线观看| 高颜值午夜福利在线观看| 久久人人爽人人爽人人av| 亚洲av成人免费在线| 99精品国产一区二区三区| 亚洲综合精品一区二区三区| 亚洲精品入口一区二区乱|