<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      kafka、RocketMQ選型

       

      1,Kafka 吞吐量高 RocketMQ 可靠性好
      Kafka 異步復(fù)制可以提供較高的吞吐量,但在極端情況下可能會導(dǎo)致數(shù)據(jù)丟失
      2,Kafka不支持分布式事務(wù)消息 RocketMQ支持
      3,Kafka更適合處理海量數(shù)據(jù)流,對數(shù)據(jù)正確性要求不是特別嚴(yán)格的場景,如日志收集、實時分析等。
      RocketMQ更適合對數(shù)據(jù)可靠性、實時性要求較高
      4,Kafka消費失敗不支持重試 RocketMQ消費失敗支持定時重試,每次重試間隔時間順延。
      5,但當(dāng)一臺Broker宕機(jī)后,可能會產(chǎn)生消息亂序的問題 RocketMQ則不會
      
      于需要高吞吐量和低延遲的實時流式數(shù)據(jù)處理,Kafka可能是更好的選擇;而對于需要高可靠性和復(fù)雜消息隊列功能的傳統(tǒng)行業(yè)應(yīng)用,RocketMQ可能是更合適的。RabbitMQ則是一個中庸的選擇,提供了較為全面的消息隊列功能和靈活的消息路由機(jī)制
      
      RocketMQ: 由阿里巴巴維護(hù),有較大的社區(qū)。
      RabbitMQ: 由Pivotal維護(hù),有較大的社區(qū)。
      Kafka: 由Apache維護(hù),社區(qū)活躍,是Apache的頂級項目。
      

        

      另外找一個zk,有客戶端命令的
      .\zkCli.cmd -server 127.0.0.1:2181
      ls /brokers 查看注冊信息
      
      1,kafka不支持分布式事務(wù)消息 不支持消費失敗重試
      2,kafka的單機(jī)TPS能跑到每秒上百萬,是因為Producer端將多個小消息合并,批量發(fā)向broker
      3,RocketMQ寫入性能上不如kafka, 主要因為kafka主要應(yīng)用于日志場景,而RocketMQ應(yīng)用于業(yè)務(wù)場景,為了保證消息必達(dá)犧牲了性能,且基于線上真實場景沒有在RocketMQ層做消息合并,推薦在業(yè)務(wù)層自己做。
      4,沒有“中心主節(jié)點”的概念,集群中所有的服務(wù)器都是對等的,因此,可以在不做任何配置的更改的情況下實現(xiàn)服務(wù)器的的添加與刪除
      https://blog.csdn.net/pengweismile/article/details/117636252
      
      https://kafka.apache.org/quickstart
      .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
      .\bin\windows\kafka-server-start.bat .\config\server.properties
      .\bin\windows\kafka-topics.bat --create --topic topic-xmh --bootstrap-server localhost:9092
      .\bin\windows\kafka-topics.bat --describe --topic topic-xmh --bootstrap-server localhost:9092
      
      .\bin\windows\kafka-console-consumer.bat --topic topic-xmh --from-beginning --bootstrap-server localhost:9092 
      .\bin\windows\kafka-console-producer.bat --topic topic-xmh --bootstrap-server localhost:9092
      
      
      https://blog.csdn.net/syc0616/article/details/118156641
      producer配置
      bootstrap.servers: kafka的地址。
      acks:消息的確認(rèn)機(jī)制,默認(rèn)值是0。
      acks=0:如果設(shè)置為0,生產(chǎn)者不會等待kafka的響應(yīng)。
      acks=1:這個配置意味著kafka會把這條消息寫到本地日志文件中,但是不會等待集群中其他機(jī)器的成功響應(yīng)。
      acks=all:這個配置意味著leader會等待所有的follower同步完成。這個確保消息不會丟失,除非kafka集群中所有機(jī)器掛掉。這是最強(qiáng)的可用性保證。
      retries:配置為大于0的值的話,客戶端會在消息發(fā)送失敗時重新發(fā)送。
      batch.size:當(dāng)多條消息需要發(fā)送到同一個分區(qū)時,生產(chǎn)者會嘗試合并網(wǎng)絡(luò)請求。這會提高client和生產(chǎn)者的效率。
      key.serializer: 鍵序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer。
      value.serializer:值序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer。
      
      public class ProMy {
          public static void main(String[] args) {
              Properties props = new Properties();
              props.put("bootstrap.servers", "localhost:9092");//,slave1:9092,slave2:9092
              props.put("acks", "all");
              props.put("retries", 0);
              props.put("batch.size", 1);//16384
              props.put("key.serializer", StringSerializer.class.getName());
              props.put("value.serializer", StringSerializer.class.getName());
              KafkaProducer producer = new KafkaProducer(props);
              producer.send(new ProducerRecord<String, String>("topic-xmh","xingkey2","xingvalues5"));
              producer.close();
              System.out.println("*************end-procuder");
          }
      }
      
      consumer配置
      bootstrap.servers: kafka的地址。
      group.id:組名 不同組名可以重復(fù)消費。例如你先使用了組名A消費了kafka的1000條數(shù)據(jù),但是你還想再次進(jìn)行消費這1000條數(shù)據(jù),并且不想重新去產(chǎn)生,那么這里你只需要更改組名就可以重復(fù)消費了。
      enable.auto.commit:是否自動提交,默認(rèn)為true。
      auto.commit.interval.ms: 從poll(拉)的回話處理時長。
      session.timeout.ms:超時時間。
      max.poll.records:一次最大拉取的條數(shù)。
      auto.offset.reset:消費規(guī)則,默認(rèn)earliest 。
      earliest: 當(dāng)各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 。
      latest: 當(dāng)各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產(chǎn)生的該分區(qū)下的數(shù)據(jù) 。
      none: topic各分區(qū)都存在已提交的offset時,從offset后開始消費;只要有一個分區(qū)不存在已提交的offset,則拋出異常。
      key.serializer: 鍵序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer。
      value.deserializer:值序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer。
      
          public static void main(String[] args) {
              Properties props = new Properties();
              props.put("bootstrap.servers", "localhost:9092");//,slave1:9092,slave2:9092
              props.put("group.id", "group_x3");
              props.put("enable.auto.commit", "true");
              props.put("auto.commit.interval.ms", "1000");
              props.put("session.timeout.ms", "30000");
      //        props.put("max.poll.records", 1);
              props.put("auto.offset.reset", "earliest");
      //        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
      //        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
              props.put("key.deserializer", StringDeserializer.class.getName());
              props.put("value.deserializer", StringDeserializer.class.getName());
              KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
              consumer.subscribe(Arrays.asList("topic-xmh"));
              //for(;;) {
                  ConsumerRecords<String, String> msgList = consumer.poll(1000);
      //            System.out.println("consumer*******:" + msgList);
                  for (ConsumerRecord<String, String> record:msgList){
                      System.out.println("**************consumer*******record1:"+record+","+record.key()+","+record.value());
                  }
              //}
      
      //        consumer.close();
          }
      
      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_2.12</artifactId>
          <version>1.0.0</version>
          <scope>provided</scope>
      </dependency>
      
      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>1.0.0</version>
      </dependency>
      
      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-streams</artifactId>
          <version>1.0.0</version>
      </dependency>
      

       

       

       

      spring-kafka

       

      監(jiān)聽消息
      @KafkaListener(id = "xmh-consumer", topics = "${spring.kafka.consumer.topics}", groupId = "ump",containerFactory = "listenerContainerFactory") public void listenTopics(ConsumerRecord<Object, Object> consumerRecord, Acknowledgment ack) { try { log.info("xmh-consumer**********"+consumerRecord); } catch (Exception e) { log.error("消費失敗******" , e); } finally { ack.acknowledge(); } } @KafkaListeners({@KafkaListener(id = "ump-consumer", idIsGroup = false, topics = "ump-simple", containerFactory = "listenerContainerFactory")}) void icbusListener(@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, @Header(value = KafkaHeaders.OFFSET) long offset, @Header(value = KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId, @Payload String message, Acknowledgment ack) { // 消費端不再使用本地隊列的方式,在生產(chǎn)端控制消息使用的分區(qū) log.info("**********=====>topic:{},key:{},partitionId:{},offset:{},value:{}", topic, key, partitionId, offset, message); try{ } catch (Exception e) { log.error("consume error:{}", e.getMessage()); } finally { ack.acknowledge(); }

       

        

       

      @Autowired
      private KafkaTemplate<Object, Object> kafkaTemplate;
      @RequestMapping("/send")
      public void sendMultiple() {
      String message = "xmh發(fā)送到Kafka的消息";
      kafkaTemplate.send("xmh-simple","xmhkey", message );
      System.out.println(message );
      }
      
      
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-web</artifactId>
                  <version>2.6.8</version>
              </dependency>
      
              <!-- kafka -->
              <dependency>
                  <groupId>org.springframework.kafka</groupId>
                  <artifactId>spring-kafka</artifactId>
                  <version>2.8.8</version>
              </dependency>
      
      
      spring:
        kafka:
          bootstrap-servers: ${KAFKA_LIST:172.x.x.xx:9092}
          producer:
            # 發(fā)生錯誤后,消息重發(fā)的次數(shù)。
            retries: 1
            #當(dāng)有多個消息需要被發(fā)送到同一個分區(qū)時,生產(chǎn)者會把它們放在同一個批次里。該參數(shù)指定了一個批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計算。 1000000
            batch-size: 16389
            # 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。 33554432
            buffer-memory: 33554432
            # 鍵的序列化方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            # 值的序列化方式
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
            # acks=0 : 生產(chǎn)者在成功寫入消息之前不會等待任何來自服務(wù)器的響應(yīng)。
            # acks=1 : 只要集群的首領(lǐng)節(jié)點收到消息,生產(chǎn)者就會收到一個來自服務(wù)器成功響應(yīng)。
            # acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點全部收到消息時,生產(chǎn)者才會收到一個來自服務(wù)器的成功響應(yīng)。
            acks: all
            compression-type: lz4
          consumer:
            # 自動提交的時間間隔 在spring boot 2.X 版本中這里采用的是值的類型為Duration 需要符合特定的格式,如1S,1M,2H,5D
            auto-commit-interval: 1S
            group-id: xmh2
      concurrency: 5 #多線程處理 # 該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理: # latest(默認(rèn)值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數(shù)據(jù)(在消費者啟動之后生成的記錄) # earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區(qū)的記錄 auto-offset-reset: earliest # 是否自動提交偏移量,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,然后手動提交偏移量 enable-auto-commit: false # 鍵的反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 主動拉取消息模式下,每次拉取消息個數(shù),該值默認(rèn)是500 max-poll-records: 10000 properties: # 每次最多獲取100M的數(shù)據(jù) max.partition.fetch.bytes: 104857600 listener: # 在偵聽器容器中運行的線程數(shù)。 concurrency: 5 #listner負(fù)責(zé)ack,每調(diào)用一次,就立即commit ack-mode: manual_immediate missing-topics-fatal: false @Configuration public class KafkaProviderConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.acks}") private String acks; @Value("${spring.kafka.producer.retries}") private String retries; @Value("${spring.kafka.producer.batch-size}") private String batchSize; @Value("${spring.kafka.producer.buffer-memory}") private String bufferMemory; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(16); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //響應(yīng)模式,我們使用acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點全部收到消息時,生產(chǎn)者才會收到一個來自服務(wù)器的成功響應(yīng)。 props.put(ProducerConfig.ACKS_CONFIG, acks); //發(fā)生錯誤后,消息重發(fā)的次數(shù),開啟事務(wù)必須大于0 props.put(ProducerConfig.RETRIES_CONFIG, retries); //當(dāng)多個消息發(fā)送到相同分區(qū)時,生產(chǎn)者會將消息打包到一起,以減少請求交互. 而不是一條條發(fā)送 props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); //有的時刻消息比較少,過了很久,比如5min也沒有湊夠16KB,這樣延時就很大,所以需要一個參數(shù). 再設(shè)置一個時間,到了這個時間, props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); //生產(chǎn)者內(nèi)存緩沖區(qū)的大小 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); //序列和消費者對應(yīng) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //用戶名密碼配置,沒有用戶名密碼可以去掉以下配置 // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); // props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); // props.put("java.security.auth.login.config", "10000"); // // 可以在nacos配置文件中配置 // props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); return props; } // 生產(chǎn)者工廠 @Bean("kafkaProduceFactory") public ProducerFactory<Object, Object> producerFactory() { DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs()); factory.setTransactionIdPrefix("kafkaXmh-"); return factory; } // 事務(wù)處理 // 這里的事務(wù)處理會和項目中的其他事務(wù)起沖突,所以我一般會把@Bean去掉,不用spring代理 @Bean("kafkaTransactionManager") @Primary public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) { return new KafkaTransactionManager<Object, Object>(producerFactory); } @Bean public KafkaTemplate<Object, Object> kafkaTemplate() { KafkaTemplate template = new KafkaTemplate<>(producerFactory()); template.setProducerListener(new KafkaSendResultHandler()); return template ; } @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${spring.kafka.properties.session.timeout.ms:10000}") private String sessionTimeout; @Value("${spring.kafka.properties.max.poll.interval.ms:600000}") private String maxPollIntervalTime; @Value("${spring.kafka.consumer.max-poll-records:3}") private String maxPollRecords; @Value("${spring.kafka.consumer.auto-offset-reset:latest}") private String autoOffsetReset; @Value("${spring.kafka.listener.concurrency:4}") private Integer concurrency; @Value("${spring.kafka.listener.missing-topics-fatal:false}") private boolean missingTopicsFatal; @Value("${spring.kafka.listener.poll-timeout:600000}") private long pollTimeout; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(16); // 服務(wù)器地址,不多說配置直接用 propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // groupId不多說,直接用 propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); //是否自動提交偏移量,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,然后手動提交偏移量 propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); //自動提交的時間間隔,自動提交開啟時生效 propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); //該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理: //我們使用latest:當(dāng)各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產(chǎn)生的該分區(qū)下的數(shù)據(jù) propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); //兩次poll之間的最大間隔,默認(rèn)值為5分鐘。如果超過這個間隔會觸發(fā)reBalance propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); //這個參數(shù)定義了poll方法最多可以拉取多少條消息,默認(rèn)值為500。 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); //當(dāng)broker多久沒有收到consumer的心跳請求后就觸發(fā)reBalance,默認(rèn)值是10s propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); //序列化(我們這邊使用StringDeserializer,與生產(chǎn)者保持一致) propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 下面四個參數(shù)是用戶名密碼的參數(shù),沒有用戶名密碼可以去掉以下配置 // propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); // propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); // propsMap.put("java.security.auth.login.config", "10000"); // // 這里username設(shè)置用戶名, password設(shè)置密碼我寫死到代碼里了,可以更改為nacos配置 // propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); return propsMap; } // 消費者工廠,將配置信息加載進(jìn)去 @Bean("consumerFactory") public DefaultKafkaConsumerFactory consumerFactory(){ return new DefaultKafkaConsumerFactory(consumerConfigs()); } @Bean("listenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //在偵聽器容器中運行的線程數(shù),一般設(shè)置為 機(jī)器數(shù)*分區(qū)數(shù) factory.setConcurrency(concurrency); //消費監(jiān)聽接口監(jiān)聽的主題不存在時,默認(rèn)會報錯,所以設(shè)置為false忽略錯誤 factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); //自動提交關(guān)閉,需要設(shè)置手動消息確認(rèn) factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.getContainerProperties().setPollTimeout(pollTimeout); return factory; } @Component public class KafkaSendResultHandler implements ProducerListener<Object, Object> { @Override public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { System.out.println("消息發(fā)送成功***:" + producerRecord.toString()); } @Override public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { System.out.println("消息發(fā)送失敗***:" + producerRecord.toString() + exception.getMessage()); } @Component public class KafkaConsumerListenerError implements KafkaListenerErrorHandler { @Override @NonNull public Object handleError(Message<?> message, ListenerExecutionFailedException e) { return new Object(); } @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) { System.out.println("消息詳情:" + message); System.out.println("異常信息::" + exception); System.out.println("消費者詳情::" + consumer.groupMetadata()); System.out.println("監(jiān)聽主題::" + consumer.listTopics()); return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); } /** * 下面的方法可以手動操控kafka的隊列監(jiān)聽情況 * 先發(fā)送一條消息,因為autoStartup = "false",所以并不會看到有消息進(jìn)入監(jiān)聽器。 * 接著啟動監(jiān)聽器,/start/testGroup??梢钥吹接幸粭l消息進(jìn)來了。 * start是開啟監(jiān)聽,stop是關(guān)閉監(jiān)聽 * pause是暫停監(jiān)聽,resume是繼續(xù)監(jiān)聽 * @param listenerId consumer的group-id */ @RequestMapping("/pause/{listenerId}") public void pause(@PathVariable String listenerId) { try { Objects.requireNonNull(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)).pause(); } catch (Exception e) { e.printStackTrace(); } } @RequestMapping("/resume/{listenerId}") public void resume(@PathVariable String listenerId) { try { Objects.requireNonNull(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)).resume(); } catch (Exception e) { e.printStackTrace(); } } @RequestMapping("/start/{listenerId}") public void start(@PathVariable String listenerId) { try { Objects.requireNonNull(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)).start(); } catch (Exception e) { e.printStackTrace(); } } @RequestMapping("/stop/{listenerId}") public void stop(@PathVariable String listenerId) { try { Objects.requireNonNull(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)).stop(); } catch (Exception e) { e.printStackTrace(); } }

       

      posted @ 2022-08-16 14:51  XUMT111  閱讀(87)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 起碰免费公开97在线视频| 日本真人做爰免费视频120秒| 玩弄丰满少妇人妻视频| 亚洲悠悠色综合中文字幕| 日韩精品视频一二三四区| 熟妇人妻一区二区三区四区| 国产免费午夜福利在线观看 | 日本一区二区三区后入式| 亚洲深深色噜噜狠狠网站| 骚虎视频在线观看| 国产精品视频一区二区不卡 | 欧美综合天天夜夜久久| 亚洲乱码一二三四区| 玩弄丰满少妇人妻视频| 亚州少妇无套内射激情视频| 狠狠综合久久综合88亚洲爱文| 大地资源中文第二页日本| 四虎永久地址www成人| AI做受???高潮AAAA视频| 人人澡人摸人人添| 开江县| 98久久人妻少妇激情啪啪| 亚洲最大成人在线播放| 亚洲性av网站| 国产精品无卡毛片视频| 日韩在线视频线观看一区| 景洪市| 18av千部影片| 激情综合网激情五月我去也| 狠狠躁夜夜躁人人爽天天5| 欧美性猛交xxxx免费看| 国产成人精品无码免费看| 日韩av日韩av在线| 日韩东京热一区二区三区| 国精产品自偷自偷ym使用方法| 377P欧洲日本亚洲大胆| 伊人中文在线最新版天堂| 欧美精品一区二区三区中文字幕| 欧美人成在线播放网站免费| 蜜臀在线播放一区在线播放| 亚洲香蕉伊综合在人在线|