flink将数据录入数据库-创新互联

本篇内容介绍了“flink将数据录入数据库”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

创新互联主营济阳网站建设的网络公司,主营网站建设方案,APP应用开发,济阳h5成都小程序开发搭建,济阳网站营销推广欢迎济阳等地区企业咨询
//主类
package flink.streaming
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.CheckpointingMode
object StreamingTest {
  def main(args: Array[String]): Unit = {
    val kafkaProps = new Properties()
    //kafka的一些属性
    kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092")
    //所在的消费组
    kafkaProps.setProperty("group.id", "group2")
    //获取当前的执行环境
    val evn = StreamExecutionEnvironment.getExecutionEnvironment
    //evn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //kafka的consumer,test1是要消费的topic
    val kafkaSource = new FlinkKafkaConsumer[String]("test1",new SimpleStringSchema,kafkaProps)
    //kafkaSource.assignTimestampsAndWatermarks(assigner)
    //设置从最新的offset开始消费
    //kafkaSource.setStartFromGroupOffsets()
    kafkaSource.setStartFromLatest()
    //自动提交offset
    kafkaSource.setCommitOffsetsOnCheckpoints(true)
    
    //flink的checkpoint的时间间隔
    //evn.enableCheckpointing(2000)
    //添加consumer
    val stream = evn.addSource(kafkaSource)
    evn.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)
    //stream.setParallelism(3)
    val text = stream.flatMap{ _.toLowerCase().split(" ")filter { _.nonEmpty} }
          .map{(_,1)}
          .keyBy(0)
          .timeWindow(Time.seconds(5))
          .sum(1)
          .map(x=>{(x._1,(new Integer(x._2)))})
     //text.print()
     //启动执行    
     
     text.addSink(new Ssinks())
     
    evn.execute("kafkawd")  
    
  }
}
//自定义sink
package flink.streaming
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.DriverManager
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.configuration.Configuration
class Ssinks extends RichSinkFunction[(String,Integer)]{
  
      var conn:Connection=_;
      var pres:PreparedStatement = _;
      var username = "root";
      var password = "123456";
      var dburl = "jdbc:mysql://192.168.6.132:3306/hgs?useUnicode=true&characterEncoding=utf-8&useSSL=false";
      var sql = "insert into words(word,count) values(?,?)";
  override def invoke(value:(String, Integer) ) {
    
    pres.setString(1, value._1);
		pres.setInt(2,value._2);
		pres.executeUpdate();
		System.out.println("values :" +value._1+"--"+value._2);
  }
  
  override def open( parameters:Configuration) {
		Class.forName("com.mysql.jdbc.Driver");
		conn = DriverManager.getConnection(dburl, username, password);
		pres = conn.prepareStatement(sql);
		super.close()
	}
  
	override def close() {
	  pres.close();
	  conn.close();
	}
}

“flink将数据录入数据库”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联-成都网站建设公司网站,小编将为大家输出更多高质量的实用文章!


网站名称:flink将数据录入数据库-创新互联
转载源于:http://csdahua.cn/article/hoiii.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流