<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [源碼分析] 從FlatMap用法到Flink的內部實現

      [源碼分析] 從FlatMap用法到Flink的內部實現

      0x00 摘要

      本文將從FlatMap概念和如何使用開始入手,深入到Flink是如何實現FlatMap。希望能讓大家對這個概念有更深入的理解。

      0x01 Map vs FlatMap

      首先我們先從概念入手。

      自從響應式編程慢慢壯大以來,這兩個單詞現在越來越被大家熟悉了。前端能見到它們的身影,后臺也能見到;安卓里面有,iOS也有。很多兄弟剛遇到它們時候是懵圈的,搞不清楚之間的區別。下面我就給大家簡單講解下。

      map

      它把數組流中的每一個值,使用所提供的函數執行一遍,一一對應。得到與元素個數相同的數組流。然后返回這個新數據流。

      flatMap

      flat是扁平的意思。所以這個操作是:先映射(map),再拍扁(join)。

      flatMap輸入可能是多個子數組流。所以flatMap先針對 每個子數組流的每個元素進行映射操作。然后進行扁平化處理,最后匯集所有進行扁平化處理的結果集形成一個新的列表(扁平化簡而言之就是去除所有的修飾)。

      flatMap與map另外一個不一樣的地方就是傳入的函數在處理完后返回值必須是List。

      實例

      比如拿到一個文本文件之后,我們是按行讀取,按行處理。如果要對每一行的單詞數進行計數,那么應該選擇Map方法,如果是統計詞頻,就應該選擇flatMap方法。

      如果還不清楚,可以看看下面這個例子:

      梁山新進一批好馬,準備給每個馬軍頭領配置一批。于是得到函數以及頭領名單如下:
      
      函數 = ( 頭領 => 頭領 + 好馬 )
      五虎將 = List(關勝、林沖、秦明、呼延灼、董平 )
      八驃騎 = List(花榮、徐寧、楊志、索超、張清、朱仝、史進、穆弘 )
      
      // Map函數的例子
      利用map函數,我們可以得到 五虎將馬軍
      
      五虎將馬軍 = 五虎將.map( 頭領 => 頭領 + 好馬 )
      結果是 List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬 )
      
      // flatMap函數的例子
      但是為了得到統一的馬軍,則可以用flatMap:
      
      馬軍頭領 = List(五虎將,八驃騎)
      馬軍 = 馬軍頭領.flatMap( 頭領 => 頭領 + 好馬 ) 
      
      結果就是:List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬,花榮 + 馬、徐寧 + 馬、楊志 + 馬、索超 + 馬、張清 + 馬、朱仝 + 馬、史進 + 馬、穆弘 + 馬 )
      

      現在大家應該清楚了吧。接下來看看幾個FlatMap的實例。

      Scala語言的實現

      Scala本身對于List類型就有map和flatMap操作。舉例如下:

      val names = List("Alice","James","Apple")
      val strings = names.map(x => x.toUpperCase)
      println(strings)
      // 輸出 List(ALICE, JAMES, APPLE)
      
      val chars = names.flatMap(x=> x.toUpperCase())
      println(chars)
      // 輸出 List(A, L, I, C, E, J, A, M, E, S, A, P, P, L, E)
      

      Flink的例子

      以上是scala語言層面的實現。下面我們看看Flink框架是如何使用FlatMap的。

      網上常見的一個Flink應用的例子:

      //加載數據源
      val source = env.fromElements("china is the best country","beijing is the capital of china")
      
      //轉化處理數據
      val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
      

      Flink源碼中的例子

      case class WordWithCount(word: String, count: Long)
      
      val text = env.socketTextStream(host, port, '\n')
      
      val windowCounts = text.flatMap { w => w.split("\\s") }
        .map { w => WordWithCount(w, 1) }
        .keyBy("word")
        .timeWindow(Time.seconds(5))
        .sum("count")
      
      windowCounts.print()
      

      上面提到的都是簡單的使用,如果有復雜需求,在Flink中,我們可以通過繼承FlatMapFunction和RichFlatMapFunction來自定義算子。

      函數類FlatMapFunction

      對于不涉及到狀態的使用,可以直接繼承 FlatMapFunction,其定義如下:

      @Public
      @FunctionalInterface
      public interface FlatMapFunction<T, O> extends Function, Serializable {
      	void flatMap(T value, Collector<O> out) throws Exception;
      }
      

      如何自定義算子呢,這個可以直接看看Flink中的官方例子

      // FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
      public class Tokenizer implements FlatMapFunction<String, String> {
        @Override
        public void flatMap(String value, Collector<String> out) {
          for (String token : value.split("\\W")) {
            out.collect(token);
          }
        }
      }
      
      // [...]
      DataSet<String> textLines = // [...]
      DataSet<String> words = textLines.flatMap(new Tokenizer());
      

      Rich函數類RichFlatMapFunction

      對于涉及到狀態的情況,用戶可以使用繼承 RichFlatMapFunction 類的方式來實現UDF。

      RichFlatMapFunction屬于Flink的Rich函數類。從名稱上來看,這種函數類在普通的函數類上增加了Rich前綴,比如RichMapFunctionRichFlatMapFunctionRichReduceFunction等等。比起普通的函數類,Rich函數類增加了:

      • open()方法:Flink在算子調用前會執行這個方法,可以用來進行一些初始化工作。
      • close()方法:Flink在算子最后一次調用結束后執行這個方法,可以用來釋放一些資源。
      • getRuntimeContext方法:獲取運行時上下文。每個并行的算子子任務都有一個運行時上下文,上下文記錄了這個算子運行過程中的一些信息,包括算子當前的并行度、算子子任務序號、廣播數據、累加器、監控數據。最重要的是,我們可以從上下文里獲取狀態數據

      FlatMap對應的RichFlatMapFunction如下:

      @Public
      public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> {
      	@Override
      	public abstract void flatMap(IN value, Collector<OUT> out) throws Exception;
      }
      

      其基類 AbstractRichFunction 如下,可以看到主要是和運行時上下文建立了聯系,并且有初始化和退出操作

      @Public
      public abstract class AbstractRichFunction implements RichFunction, Serializable {
        
      	private transient RuntimeContext runtimeContext;
      
      	@Override
      	public void setRuntimeContext(RuntimeContext t) {
      		this.runtimeContext = t;
      	}
      
      	@Override
      	public RuntimeContext getRuntimeContext() {
      			return this.runtimeContext;
      	}
      
      	@Override
      	public IterationRuntimeContext getIterationRuntimeContext() {
          if (this.runtimeContext instanceof IterationRuntimeContext) {
      			return (IterationRuntimeContext) this.runtimeContext;
      		} 
      	}
      
      	@Override
      	public void open(Configuration parameters) throws Exception {}
      
      	@Override
      	public void close() throws Exception {}
      }
      

      如何最好的使用? 當然還是官方文檔和例子最靠譜。

      因為涉及到狀態,所以如果使用,你必須創建一個 StateDescriptor,才能得到對應的狀態句柄。 這保存了狀態名稱(你可以創建多個狀態,并且它們必須具有唯一的名稱以便可以引用它們),狀態所持有值的類型,并且可能包含用戶指定的函數,例如ReduceFunction。 根據不同的狀態類型,可以創建ValueStateDescriptorListStateDescriptorReducingStateDescriptorFoldingStateDescriptorMapStateDescriptor

      狀態通過 RuntimeContext 進行訪問,因此只能在 rich functions 中使用。 但是我們也會看到一個例子。RichFunctionRuntimeContext 提供如下方法:

      • ValueState getState(ValueStateDescriptor)
      • ReducingState getReducingState(ReducingStateDescriptor)
      • ListState getListState(ListStateDescriptor)
      • AggregatingState getAggregatingState(AggregatingStateDescriptor)
      • FoldingState getFoldingState(FoldingStateDescriptor)
      • MapState getMapState(MapStateDescriptor)

      下面是一個 FlatMapFunction 的例子,展示了如何將這些部分組合起來:

      class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
      
        private var sum: ValueState[(Long, Long)] = _
      
        override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
      
          // access the state value
          val tmpCurrentSum = sum.value
      
          // If it hasn't been used before, it will be null
          val currentSum = if (tmpCurrentSum != null) {
            tmpCurrentSum
          } else {
            (0L, 0L)
          }
      
          // update the count
          val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
      
          // update the state
          sum.update(newSum)
      
          // if the count reaches 2, emit the average and clear the state
          if (newSum._1 >= 2) {
            out.collect((input._1, newSum._2 / newSum._1))
            sum.clear()
          }
        }
      
        override def open(parameters: Configuration): Unit = {
          sum = getRuntimeContext.getState(
            new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
          )
        }
      }
      
      object ExampleCountWindowAverage extends App {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
      
        env.fromCollection(List(
          (1L, 3L),
          (1L, 5L),
          (1L, 7L),
          (1L, 4L),
          (1L, 2L)
        )).keyBy(_._1)
          .flatMap(new CountWindowAverage())
          .print()
        // the printed output will be (1,4) and (1,5)
      
        env.execute("ExampleManagedState")
      }
      

      這個例子實現了一個簡單的計數窗口。 我們把元組的第一個元素當作 key(在示例中都 key 都是 “1”)。 該函數將出現的次數以及總和存儲在 “ValueState” 中。 一旦出現次數達到 2,則將平均值發送到下游,并清除狀態重新開始。 請注意,我們會為每個不同的 key(元組中第一個元素)保存一個單獨的值。

      0x03 從Flink源碼入手看FlatMap實現

      FlatMap從Flink編程模型角度講屬于一個算子,用來對數據流或者數據集進行轉換。從框架角度說,FlatMap是怎么實現的呢? 或者說FlatMap是怎么從用戶代碼轉換到Flink運行時呢 ?

      1. DataSet

      首先說說 DataSet相關這套系統中FlatMap的實現。

      請注意,DataSteam對應的那套系統中,operator名字都是帶著Stream的,比如StreamOperator。

      DataSet

      val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1) 這段代碼調用的就是DataSet中的API。具體如下:

      public abstract class DataSet<T> {
        
      	public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
          
      		String callLocation = Utils.getCallLocationName();
          
      		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true);
      		return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation);
      	}
      }
      

      FlatMapOperator

      可以看出,flatMap @ DataSet 主要就是生成了一個FlatMapOperator,這個可以理解為是邏輯算子。其定義如下:

      public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, FlatMapOperator<IN, OUT>> {
      
      	protected final FlatMapFunction<IN, OUT> function;
      	protected final String defaultName;
      
      	public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function, String defaultName) {
      		super(input, resultType);
      		this.function = function;
      		this.defaultName = defaultName;
      	}
      
      	@Override
      	protected FlatMapFunction<IN, OUT> getFunction() {
      		return function;
      	}
      
        // 這個translateToDataFlow就是生成計劃(Plan)的關鍵代碼
      	@Override
      	protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
      		String name = getName() != null ? getName() : "FlatMap at " + defaultName;
      		// create operator
      		FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function,
      			new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
      		// set input
      		po.setInput(input);
      		// set parallelism
      		if (this.getParallelism() > 0) {
      			// use specified parallelism
      			po.setParallelism(this.getParallelism());
      		} else {
      			// if no parallelism has been specified, use parallelism of input operator to enable chaining
      			po.setParallelism(input.getParallelism());
      		}
      		return po;
      	}
      }
      

      FlatMapOperator的基類如下:

      public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOperator<IN, OUT, O>> extends SingleInputOperator<IN, OUT, O> implements UdfOperator<O> {
      
      }
      
      // Base class for operations that operates on a single input data set.
      public abstract class SingleInputOperator<IN, OUT, O extends SingleInputOperator<IN, OUT, O>> extends Operator<OUT, O> {
        	private final DataSet<IN> input;
      }
      

      生成計劃

      DataSet API所編寫的批處理程序跟DataStream API所編寫的流處理程序在生成作業圖(JobGraph)之前的實現差別很大。流處理程序是生成流圖(StreamGraph),而批處理程序是生成計劃(Plan)并由優化器對其進行優化并生成優化后的計劃(OptimizedPlan)。

      計劃(Plan)以數據流(dataflow)的形式來表示批處理程序,但它只是批處理程序最初的表示,在一個批處理程序生成作業圖之前,計劃還會被進行優化以產生更高效的方案。Plan不同于流圖(StreamGraph),它以sink為入口,因為一個批處理程序可能存在若干個sink,所以Plan采用集合來存儲它。另外Plan還封裝了批處理作業的一些基本屬性:jobId、jobName以及defaultParallelism等。

      生成Plan的核心部件是算子翻譯器(OperatorTranslation),createProgramPlan方法通過它來”翻譯“出計劃,核心代碼如下

      public class OperatorTranslation {
        
         // 接收每個需遍歷的DataSink對象,然后將其轉換成GenericDataSinkBase對象
         public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) {
             List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();
             //遍歷sinks集合
             for (DataSink<?> sink : sinks) {
                  //將翻譯生成的GenericDataSinkBase加入planSinks集合*,對每個sink進行”翻譯“
                  planSinks.add(translate(sink));
              }
             //以planSins集合構建Plan對象
             Plan p = new Plan(planSinks);
             p.setJobName(jobName);
             return p;
          }
      
      	private <I, O> org.apache.flink.api.common.operators.Operator<O>    translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) {
          //會調用到 FlatMapOperator 的 translateToDataFlow
      	org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input);    
        }
        
      }
      

      FlatMapOperatorBase就是生成的plan中的一員。

      public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
      	@Override
      	protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
      		FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject();
      		
      		FunctionUtils.setFunctionRuntimeContext(function, ctx);
      		FunctionUtils.openFunction(function, parameters);
      
      		ArrayList<OUT> result = new ArrayList<OUT>(input.size());
      
      		TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
      		TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
      
      		CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
      
      		for (IN element : input) {
      			IN inCopy = inSerializer.copy(element);
      			function.flatMap(inCopy, resultCollector);
      		}
      
      		FunctionUtils.closeFunction(function);
      
      		return result;
      	}
      }
      

      而最后優化時候,則FlatMapOperatorBase會被優化成FlatMapNode。

      public class GraphCreatingVisitor implements Visitor<Operator<?>> {
      	public boolean preVisit(Operator<?> c) {
          else if (c instanceof FlatMapOperatorBase) {
      			n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
      		}
        }
      }
      

      自此,FlatMap就被組合到 DataSet的 OptimizedPlan 中。下一步Flink會依據OptimizedPlan來生成 JobGraph。

      作業圖(JobGraph)是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。至此就完成了從用戶業務代碼到Flink運行系統的轉化。

      在運行狀態下,如果上游有數據流入,則FlatMap這個算子就會發揮作用。

      2. DataStream

      對于DataStream,則是另外一套體系結構。首先我們找一個使用DataStream的例子看看。

      //獲取數據: 從socket中獲取
      val textDataStream = env.socketTextStream("127.0.0.1", 8888, '\n')
      val tupDataStream = textDataStream.flatMap(_.split(" ")).map(WordWithCount(_,1))
      
      //groupby: 按照指定的字段聚合
      val windowDstram = tupDataStream.keyBy("word").timeWindow(Time.seconds(5),Time.seconds(1))
      windowDstram.sum("count").print()
      

      上面例子中,flatMap 調用的是DataStream中的API,具體如下:

      public class DataStream<T> {
        
      	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
          //clean函數用來移除FlatMapFunction類對象的外部類部分,這樣就可以進行序列化
          //getType用來獲取類對象的輸出類型
      		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
      				getType(), Utils.getCallLocationName(), true);
      		return flatMap(flatMapper, outType);
      	}
        
        // 構建了一個StreamFlatMap的Operator
      	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
      		return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
      	}  
        
        // 依次調用下去
      	@PublicEvolving
      	public <R> SingleOutputStreamOperator<R> transform(
      			String operatorName,
      			TypeInformation<R> outTypeInfo,
      			OneInputStreamOperator<T, R> operator) {
      		return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
      	}
        
      	protected <R> SingleOutputStreamOperator<R> doTransform(
      			String operatorName,
      			TypeInformation<R> outTypeInfo,
      			StreamOperatorFactory<R> operatorFactory) {
      		// read the output type of the input Transform to coax out errors about MissingTypeInfo
      		transformation.getOutputType();
          // 構建Transform對象,Transform對象中包含其上游Transform對象,這樣上游下游就串成了一個Transform鏈。
      		OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
      				this.transformation,
      				operatorName,
      				operatorFactory,
      				outTypeInfo,
      				environment.getParallelism());
      		@SuppressWarnings({"unchecked", "rawtypes"})
      		SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
          // 將這Transform對象放入env的transform對象列表。
      		getExecutionEnvironment().addOperator(resultTransform);
          // 返回流
      		return returnStream;
      	}  
      }
      

      上面源碼中的幾個概念需要澄清。

      Transformation:首先,FlatMap在FLink編程模型中是算子API,在DataStream中會生成一個Transformation,即邏輯算子。

      邏輯算子Transformation最后會對應到物理算子Operator,這個概念對應的就是StreamOperator

      StreamOperator:DataStream 上的每一個 Transformation 都對應了一個 StreamOperator,StreamOperator是運行時的具體實現,會決定UDF(User-Defined Funtion)的調用方式。

      processElement()方法也是UDF的邏輯被調用的地方,例如FlatMapFunction里的flatMap()方法。

      public class StreamFlatMap<IN, OUT>
      		extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
      		implements OneInputStreamOperator<IN, OUT> {
      
      	private transient TimestampedCollector<OUT> collector;
      
      	public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
      		super(flatMapper);
      		chainingStrategy = ChainingStrategy.ALWAYS;
      	}
      
      	@Override
      	public void open() throws Exception {
      		super.open();
      		collector = new TimestampedCollector<>(output);
      	}
      
      	@Override
      	public void processElement(StreamRecord<IN> element) throws Exception {
      		collector.setTimestamp(element);
          // 調用用戶定義的FlatMap
      		userFunction.flatMap(element.getValue(), collector);
      	}
      }
      

      我們可以看到,StreamFlatMap繼承了AbstractUdfStreamOperator,從而間接繼承了StreamOperator。

      public abstract class AbstractStreamOperator<OUT>
      		implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable {
      }
      

      StreamOperator是根接口。對于 Streaming 來說所有的算子都繼承自 StreamOperator。繼承了StreamOperator的擴展接口則有OneInputStreamOperator,TwoInputStreamOperator。實現了StreamOperator的抽象類有AbstractStreamOperator以及它的子類AbstractUdfStreamOperator。

      從 API 到 邏輯算子 Transformation,再到 物理算子Operator,就生成了 StreamGraph。下一步Flink會依據StreamOperator來生成 JobGraph。

      作業圖(JobGraph)是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。至此就完成了從用戶業務代碼到Flink運行系統的轉化。

      0x04 參考

      Flink中richfunction的一點小作用

      【淺顯易懂】scala中map與flatMap的區別

      Working with State

      flink簡單應用: scala編寫wordcount

      【Flink】Flink基礎之實現WordCount程序(Java與Scala版本)

      Flink進階教程:以flatMap為例,如何進行算子自定義

      Flink運行時之批處理程序生成計劃

      posted @ 2020-03-30 21:15  羅西的思考  閱讀(5187)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 日本一区二区精品色超碰| 精品国产精品中文字幕| 久久精品一本到99热免费| 东京热人妻中文无码| 国产精品成人av电影不卡| 欧美亚洲一区二区三区在线| 亚洲精品乱码久久久久久蜜桃| 国产99久久精品一区二区| 99久久无码私人网站| 成人区人妻精品一区二蜜臀 | 成人精品自拍视频免费看| 金寨县| 亚洲综合精品第一页| 一个人免费观看WWW在线视频| 女人喷液抽搐高潮视频| 精品久久久久久国产| 精品人妻伦一二三区久久aaa片| 精品国产乱码久久久人妻| 增城市| 亚洲狼人久久伊人久久伊| AV人摸人人人澡人人超碰| 亚洲熟女精品一区二区| 亚洲精品国产av成人网| 在线观看无码av五月花| 国产一区二区三区色视频| 成人做受120秒试看试看视频| 视频二区中文字幕在线| 老师破女学生处特级毛ooo片| 亚洲综合一区二区三区不卡| 深夜宅男福利免费在线观看| 永久免费无码av网站在线观看| 亚洲av永久一区二区| 大地资源免费视频观看| 欧美成人午夜在线观看视频| 国产女人叫床高潮大片| 国产午夜在线观看视频播放 | 亚洲久悠悠色悠在线播放| 国模一区二区三区私拍视频| 少妇办公室好紧好爽再浪一点| 久久经精品久久精品免费观看| 亚洲一区二区三区小蜜桃|