Flink自定义实现端到端的exactly-once语义

Flink自定义实现端到端的exactly-once语义
Flink 中有两个 Exactly-Once 语义实现,一个是 Kafka,另一个是 StreamingFileSink。
 
参考他们实现的逻辑,来自定义实现端到端exactly-once语义。
 
分析:
 
Flink的checkpoint机制(通过Chandy-Lamport):
 
JobManager的CheckpointCoordinator通过在stream中添加barrier,当barrier前的数据的所有operator的checkpoint都操作完成并返回CheckpointCoordinator,才代表此次checkpoint执行完成;
 
checkpoint机制可以保证不丢数据,因为每次恢复的时候都是从最后一次checkpoint成功的地方开始处理,这样可能会重复处理某些数据,实现了at-least-once,没法做到exactly-once语义;
 
flink提供了TwoPhaseCommit两阶段提交机制:pre-commit预提交和commit正式提交,其中pre-commit不是真正提交了,可以回滚的,当两次checkpoint间某operator挂了,此时sink端预提交的数据操作会被回滚,然后从最后一次checkpoint成功的地方开始处理,实现了exactly-once语义。
 
实现:
 
具体实现主要是通过继承TwoPhaseCommitSinkFunction,重写里面的方法,关闭mysql自动提交,在commit()方法中真正提交,abort()方法中rollback
 
主要代码如下:
 
public class TwoPhaseCommitMysqlConsumerDemo {
 
    public static void main(String[] args) throws Exception{
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        //  设置ck属性
 
        env.setStateBackend(new FsStateBackend("hdfs://zcx1:9000:/flink/ck"));
 
        env.enableCheckpointing(10000);
 
        env.getCheckpointConfig()。setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 
        env.getCheckpointConfig()。setMinPauseBetweenCheckpoints(500);
 
        env.getCheckpointConfig()。setCheckpointTimeout(60000);
 
        env.getCheckpointConfig()。setFailOnCheckpointingErrors(false);
 
        env.getCheckpointConfig()。enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
        env.getCheckpointConfig()。setMaxConcurrentCheckpoints(1);
 
        Properties properties=new Properties();
 
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"zcx4:9092,zcx5:9092,zcx6:9092");
 
        //设置读取已提交的数据
 
        properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
 
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
 
        FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("zcx2222", new SimpleStringSchema(),properties);
 
        DataStreamSource dataStreamSource = env.addSource(kafka);
 
        SingleOutputStreamOperator sum = dataStreamSource.flatMap(new FlatMapFunction<String, WC>() {
 
            @Override
 
            public void flatMap(String s, Collector<WC> collector) throws Exception {
 
                String[] split = s.split(" ");
 
                for (String ss : split) {
 
                    collector.collect(new WC(ss, 1));
 
                }
 
            }
 
        })。keyBy("word")。sum("num");
 
        sum.print();
 
        TwoPhaseCommitSinkFunction<WC, MyConnection, Void> twoPhaseCommitSinkFunction = new TwoPhaseCommitSinkFunction<WC, MyConnection, Void>(new KryoSerializer<MyConnection>(MyConnection.class, new ExecutionConfig()), new VoidSerializer()) {
 
            @Override
 
            protected void invoke(MyConnection myconnection, WC wc, Context context) throws Exception {
 
                PreparedStatement preparedStatement = myconnection.connection.prepareStatement("insert into wc values(?,?) on duplicate key update num=?");
 
                preparedStatement.setString(1, wc.word);
 
                preparedStatement.setInt(2, wc.num);
 
                preparedStatement.setInt(3, wc.num);
 
                preparedStatement.executeUpdate();
 
                preparedStatement.close();
 
            }
 
            @Override
 
            protected MyConnection beginTransaction() throws Exception {
 
                Class.forName("com.mysql.jdbc.Driver");
 
                Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
 
                //关闭自动提交
 
                connection.setAutoCommit(false);
 
                return new MyConnection(connection);
 
            }
 
            @Override
 
            protected void preCommit(MyConnection myconnection) throws Exception {
 
            //invoke中完成
 
            }
 
            @Override
 
            protected void commit(MyConnection myconnection) {
 
                try {
 
                    myconnection.connection.commit();
 
                    myconnection.connection.close();
 
                } catch (SQLException e) {
 
                    e.printStackTrace();
 
                }
 
            }
 
            @Override
 
            protected void abort(MyConnection myconnection) {
 
                try {
 
                    myconnection.connection.rollback();
 
                    myconnection.connection.close();
 
                } catch (SQLException e) {
 
                    e.printStackTrace();
 
                }
 
            }
 
        };
 
        sum.addSink(twoPhaseCommitSinkFunction);
 
        env.execute();
 
    }
 
}
 
class MyConnection{
 
 transient Connection connection;
 
    public MyConnection(Connection connection) {
 
        this.connection = connection;
 
    }
 
}

推荐阅读