This article is based on Flink 1.12-SNAPSHOT, and uses the sql client command line to submit insert statements for the analysis of the whole process.

sql-client.sh embedded --update "INSERT INTO user_log_sink2 SELECT * FROM user_log"

Initialize The environment


class: org.apache.flink.table.client.SqlClient#


public static void main(String[] args) {
if  (args.length < 1) {   CliOptionsParser.printHelpClient();


  }  switch (args[0]) {


     remove mode    final String[] modeArgs = Arrays.copyOfRange(args, 1, args.length);    final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);

    if (options.isPrintHelp()) {


    } else {

     try {

      final SqlClient client = new SqlClient(true, options);

      client.start();     } catch (SqlClientException e) {

      // make space in terminal

      System.out.println();      System.out.println();

      LOG.error("SQL Client must stop.", e);

      throw e;     } catch (Throwable t) {

      // make space in terminal

      System.out.println();      System.out.println();

      LOG.error("SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.", t);

      throw new SqlClientException("Unexpected exception. This is a bug. Please consider filing an issue.", t);     }    }



    throw new SqlClientException("Gateway mode is not supported yet.");   default:    CliOptionsParser.printHelpClient();  

} } }

First judge the number of parameters, select the execution mode as EMBEDD or Gateway according to the first parameter, and this time it will enter EMBEDDED.

Next is to parse the command-line arguments.

Currently supported parameters are available at org.apache.flink.table.client.cli.CliOptionsParser

#parseEmbeddedModeClient org.apache.flink.table.client.cli.CliOptionsParserThe class is used to parse the command line.

Then create a SqlClient object based on the incoming arguments, call the start method

private void start() {
if (isEmbedded) { /
/ create local executor with default environment   final List jars;

   if (options.getJars() != null) {

    jars = options.getJars();

   } else {

    jars = Collections.emptyList();   }   final List libDirs;

   if (options.getLibraryDirs() != null) {

    libDirs = options.getLibraryDirs();

   } else {

    libDirs = Collections.emptyList();   }   final Executor executor = new LocalExecutor(options.getDefaults(), jars, libDirs);   executor.start();    create CLI client with session environment   final Environment sessionEnv = readSessionEnvironment(options.getEnvironment());   appendPythonConfig(sessionEnv, options.getPythonConfiguration());   final SessionContext context;

   if (options.getSessionId() == null) {

    context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv);

   } else {

    context = new SessionContext(options.getSessionId(), sessionEnv);   }   // Open an new session   String sessionId = executor.openSession(context);   try {    // add shutdown hook    Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(sessionId, executor));

    // do the actual work

    openCli(sessionId, executor);   } finally {    executor.closeSession(sessionId);   }

  } else {

   throw new SqlClientException("Gateway mode is not supported yet.");  

} }}

first instantiates the local executor according to the default sql-client-defaults.yaml configuration file and calls the start method of the instance, but no processing is done in this method.

Then read the session environment

to generate the SessionContext, note that the session environment here is actually the configuration file specified by the user through the -e parameter

Here is a brief introduction to the org.apache.flink.table.client.gateway.SessionContext class, which describes a session and is mainly used to open a new session on the backend. If a client requests to open a new session, the backend {@link Executor} maintains a {@link org.apache.flink.table.client.gateway.local.ExecutionContext} for it, and this session ID needs to be appended for each client interaction

Then pass the context object into the executor.openSession method to get the sessionId.

Then create a shutdown hook, the main work of this hook is to kill the submitted query job before closing the SQL client, to prevent the query job from running on the cluster and wasting resources.

private  void cancelQueryInternal(ExecutionContext context, String resultId) {  final DynamicResult result = resultStore.getResult(resultId); 

  if (result == null) {

   throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'." );  }  // stop retrieval and remove the result

  LOG.info("Cancelling job {} and result retrieval.", resultId);

  result.close();  resultStore.removeResult(resultId);   stop Flink job  try (final ClusterDescriptor clusterDescriptor = context.createClusterDescriptor()) {   ClusterClient  clusterClient = null;   try {    // retrieve existing cluster    clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();    try {     clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();    } catch (Throwable t) {     // the job might has finished earlier    }   } catch (Exception e) {

    throw new SqlExecutionException("Could not retrieve or create a cluster.", e);

   } finally {    try {

     if (clusterClient != null) {

      clusterClient.close();     }    } catch (Exception e) {     // ignore    }   }  } catch (SqlExecutionException e) {   throw e;  } catch (Exception e) {

   throw new SqlExecutionException("Could not locate a cluster.", e);


, the

sessionId and LocalExecutor objects are passed into the openCli method, after which it enters the actual working method.

Opens the CLI client for executing SQL statements.

  * Opens the CLI client for executing SQL statements.  *

  * @param sessionId session identifier for the current client.

  * @param executor executor  */ private void openCli(String sessionId, Executor executor) {  CliClient cli = null;  try {   Path historyFilePath;

   if (options.getHistoryFilePath() != null) {

    historyFilePath = Paths.get(options.getHistoryFilePath());

   } else {

    historyFilePath = Paths.get(System.getProperty("user.home"),
      SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history");   }   cli = new CliClient(sessionId, executor, historyFilePath);    interactive CLI mode

   if (options.getUpdateStatement() == null) {

    cli.open();   }   // execute single update statement

   else {

    final boolean success = cli.submitUpdate(options.getUpdateStatement());

    if (!success) {

     throw new SqlClientException("Could not submit given SQL update statement to cluster.");    }   }  } finally {

   if (cli != null) {


} } } }First

determine whether historyFilePath is specified in the command line argument, if it is not displayed, the .flink-sql-history under the HOME path of the current user will be used as historyFilePath

Here, since we directly pass in the SQL statement through the update parameter on the command line, it will not enter the interactive mode of the terminal, Instead, a single update statement is executed.

where options.getUpdateStatement()

is the

SQL statement we passed in the command, that is, INSERT INTO user_log_ sink2 SELECT * FROM user_log

Execute single update statement Execute the

submitUpdate method

 /**  * Submits a SQL update statement and prints status information and/or errors on the terminal.  *  * @param statement SQL update statement

  * @return flag to indicate if the submission was successful or not

  */ public boolean submitUpdate(String statement) {  terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());  terminal.writer().println(new AttributedString(statement).toString());  terminal.flush();  final Optional parsedStatement = parseCommand(statement);   only support INSERT INTO/OVERWRITE

  return parsedStatement.map(cmdCall -> {

   switch (cmdCall.command) {

    case INSERT_INTO:

     return callInsert(cmdCall);    default:     printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);

     return false;




printed two lines of information

[INFO] Executing the following statement: INSERT INTO user_log_sink2 SELECT * FROM user_log

Parsing SQL Statement

then parses the incoming SQL statement

 private Optional parseCommand(String line) { final Optional parsedLine = SqlCommandParser.parse(executor.getSqlParser(sessionId), line); 

  if (!parsedLine.isPresent()) {

   printError(CliStrings.MESSAGE_UNKNOWN_SQL);  }

  return parsedLine;

First get the Parser object from executor.getSqlParser(sessionId).

 @Override public Parser getSqlParser(String sessionId) {  final ExecutionContext context = getExecutionContext(sessionId);  final TableEnvironment tableEnv = context.getTableEnvironment();  final Parser parser = ((TableEnvironmentInternal) tableEnv).getParser(); 

  return new Parser() {

   @Override   public List parse(String statement) {

    return  context.wrapClassLoader(() -> parser.parse(statement));

   }   @Override   public UnresolvedIdentifier parseIdentifier(String identifier) {

    return context.wrapClassLoader(() -> parser.parseIdentifier(identifier));

   }  }; Pass


Parse object and SQL statement into the SqlCommandParser.parse method, org.apache.flink.table.client.cli.SqlCommandParser is a simple parser for determining the type of command and its arguments.

  public static Optional parse(Parser sqlParser, String stmt) {  // normalize  stmt = stmt.trim(); 

  // remove '; ' at the end

  if (stmt.endsWith(";" )) {   stmt = stmt.substring(0, stmt.length() - 1).trim();  }  // parse statement via sql parser first  Optional callOpt = parseBySqlParser(sqlParser, stmt);

  if (callOpt.isPresent()) {

   return callOpt;
  } else {
   return parseByRegexMatching(stmt);  } } private static Optional parseBySqlParser(Parser sqlParser, String stmt) {  List operations;  try {   operations = sqlParser.parse(stmt);  } catch (Throwable e) {

   if (e instanceof ValidationException) {

    // can be parsed via sql parser, but is not validated.     throw exception directly

    throw new SqlExecutionException("Invalidate SQL statement.", e);


   return Optional.empty();


  if (operations.size() != 1) {

   throw new SqlExecutionException("Only single statement is supported now." );  }  final SqlCommand cmd;  String[] operands = new String[] { stmt };  Operation operation = operations.get(0);

  if (operation instanceof CatalogSinkModifyOperation) {

   boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite();   cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;

  } else if (operation instanceof CreateTableOperation) {

   cmd = SqlCommand.CREATE_TABLE;

  } else if (operation instanceof DropTableOperation) {

   cmd = SqlCommand.DROP_TABLE;

  } else if (operation instanceof AlterTableOperation) {

   cmd = SqlCommand.ALTER_TABLE;

  } else if (operation instanceof CreateViewOperation) {

   cmd = SqlCommand.CREATE_VIEW;   CreateViewOperation op = (CreateViewOperation) operation;   operands = new String[] { op.getViewIdentifier().asSerializableString(),     op.getCatalogView().getOriginalQuery() };

  } else if (operation instanceof DropViewOperation) {

   cmd = SqlCommand.DROP_VIEW;   operands = new String[] { ((DropViewOperation) operation).getViewIdentifier().asSerializableString() };

  } else if (operation instanceof CreateDatabaseOperation) {

   cmd = SqlCommand.CREATE_DATABASE;

  } else if (operation instanceof DropDatabaseOperation) {

   cmd = SqlCommand.DROP_DATABASE;

  } else if (operation instanceof AlterDatabaseOperation) {

   cmd = SqlCommand.ALTER_DATABASE;

  } else if (operation instanceof CreateCatalogOperation) {

   cmd = SqlCommand.CREATE_CATALOG;

  } else if (operation instanceof DropCatalogOperation) {

   cmd = SqlCommand.DROP_CATALOG;

  } else if (operation instanceof UseCatalogOperation) {

   cmd = SqlCommand.USE_CATALOG;   operands = new String[] { ((UseCatalogOperation) operation).getCatalogName() };

  } else if (operation instanceof UseDatabaseOperation) {

   cmd = SqlCommand.USE;   operands = new String[] { ((UseDatabaseOperation) operation).getDatabaseName() };

  } else if (operation instanceof ShowCatalogsOperation) {

   cmd = SqlCommand.SHOW_CATALOGS;   operands = new String[0];

  } else if (operation instanceof ShowDatabasesOperation) {

   cmd = SqlCommand.SHOW_DATABASES;   operands = new String[0];

  } else if (operation instanceof ShowTablesOperation) {

   cmd = SqlCommand.SHOW_TABLES;   operands = new String[0];

  } else if (operation instanceof ShowFunctionsOperation) {

   cmd = SqlCommand.SHOW_FUNCTIONS;   operands = new String[0];

  } else if (operation instanceof CreateCatalogFunctionOperation ||

    operation instanceof CreateTempSystemFunctionOperation) {   cmd = SqlCommand.CREATE_FUNCTION;

  } else if (operation instanceof DropCatalogFunctionOperation ||

    operation instanceof DropTempSystemFunctionOperation) {   cmd = SqlCommand.DROP_FUNCTION;

  } else if (operation instanceof AlterCatalogFunctionOperation) {

   cmd = SqlCommand.ALTER_FUNCTION;

  } else if (operation instanceof ExplainOperation) {

   cmd = SqlCommand.EXPLAIN;

  } else if (operation instanceof DescribeTableOperation) {

   cmd = SqlCommand.DESCRIBE;   operands = new String[] { ((DescribeTableOperation) operation).getSqlIdentifier().asSerializableString() };

  } else if (operation instanceof QueryOperation) {

   cmd = SqlCommand.SELECT;

  } else {

   cmd = null;  }

  return cmd == null ? Optional.empty() : Optional.of(new SqlCommandCall(cmd, operands));


finally returns to the call in org.apache.flink.table.client.cli.CliClient#submitUpdate final Optional parsedStatement = parseCommand (statement)

Then execute

 //  only support INSERT INTO/OVERWRITE
return parsedStatement.map(cmdCall -> { switch (cmdCall.command) {


     return callInsert(cmdCall);    default:     printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);

     return false;



Call Insert Method

into the callInsert method

   private boolean callInsert(SqlCommandCall cmdCall) {  printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT);  try {   final ProgramTargetDescriptor programTarget = executor.executeUpdate(sessionId, cmdCall.operands[0]);   terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());   terminal.writer().println(programTarget.toString());   terminal.flush();  } catch (SqlExecutionException e) {   printExecutionException(e); 

   return false;


  return true;


will first print a line of information

in the terminal [INFO] Submitting SQL update statement to the cluster... 

Then execute the executor.executeUpdate(sessionId, cmdCall.operands[0]) method

 @Override public ProgramTargetDescriptor executeUpdate(String sessionId, String statement) throws SqlExecutionException {  final ExecutionContext context = getExecutionContext( sessionId); 

  return executeUpdateInternal(sessionId, context, statement);


the executeUpdateIntegral method

private  ProgramTargetDescriptor executeUpdateInternal( String sessionId, ExecutionContext  context,   String statement) {  applyUpdate(context, statement);  Todo: we should refactor following condition after TableEnvironment has support submit job directly. 

  if (! INSERT_SQL_PATTERN.matcher(statement.trim()).matches()) {

   return null;  }  // create pipeline

  final String jobName = sessionId + ": " + statement;

  final Pipeline pipeline;  try {   pipeline = context.createPipeline(jobName);  } catch (Throwable t) {   // catch everything such that the statement does not crash the executor

   throw new SqlExecutionException("Invalid SQL statement.", t);

  }  // create a copy so that we can change settings without affecting the original config  Configuration configuration = new Configuration(context.getFlinkConfig());

  // for update queries we don

't wait for the job result, so run in detached mode  configuration.set(DeploymentOptions.ATTACHED, false);   create execution  final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);   blocking deployment  try {   JobClient jobClient = deployer.deploy().get();   return ProgramTargetDescriptor.of(jobClient.getJobID());  } catch (Exception e) {   throw new RuntimeException("Error running SQL job.", e);  } }Buffer


first goes into the applyUpdate(context, statement) method

   /**  * Applies the given update statement to the given table environment with query configuration.  */ private  void applyUpdate(ExecutionContext context, String updateStatement) {  final TableEnvironment tableEnv = context.getTableEnvironment();  try {   // TODO replace sqlUpdate with executeSql

   // This needs we do more refactor, because we can

't set the flinkConfig in ExecutionContext   // into StreamExecutionEnvironment   context.wrapClassLoader(() -> tableEnv.sqlUpdate(updateStatement));  } catch (Throwable t) {   // catch everything such that the statement does not crash the executor   throw new SqlExecutionException("Invalid SQL update statement.", t);  } } Enter

the tableEnv.sqlUpdate(updateStatement) method

@Override public void sqlUpdate(String stmt) {  List operations = parser.parse(stmt); 

  if (operations.size() != 1) {

   throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG);  }  Operation operation = operations.get(0);

  if (operation instanceof ModifyOperation) {

   buffer(Collections.singletonList((ModifyOperation) operation));

  } else if (operation instanceof CreateTableOperation ||

    operation instanceof DropTableOperation ||    operation instanceof AlterTableOperation ||    operation instanceof CreateViewOperation ||    operation instanceof DropViewOperation ||    operation instanceof CreateDatabaseOperation ||    operation instanceof DropDatabaseOperation ||    operation instanceof AlterDatabaseOperation ||    operation instanceof CreateCatalogFunctionOperation ||    operation instanceof CreateTempSystemFunctionOperation ||    operation instanceof DropCatalogFunctionOperation ||    operation instanceof DropTempSystemFunctionOperation ||    operation instanceof AlterCatalogFunctionOperation ||    operation instanceof CreateCatalogOperation ||    operation instanceof DropCatalogOperation ||    operation instanceof UseCatalogOperation ||    operation instanceof UseDatabaseOperation) {   executeOperation(operation);

  } else {

   throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG);  } }

parse(stmt) method eventually returns Collections.singletonList(operation)

 @Override public List  parse(String statement) {  CalciteParser parser = calciteParserSupplier.get();  FlinkPlannerImpl planner = validatorSupplier.get();   parse the sql query  SqlNode parsed = parser.parse(statement);  Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)

   .orElseThrow(() -> new TableException("Unsupported query: " + statement));

  return Collections.singletonList(operation); }

buffer(Collections.singletonList((ModifyOperation) operation)) adds the operation to a private

 void buffer(List  modifyOperations) {  bufferedModifyOperations.addAll(modifyOperations); }

Create pipeline and blocking deployment

returns to org.apache.flink.table.client.gateway.local.LocalExecutor #executeUpdateInternal

    //Todo: we should refactor following condition after TableEnvironment has support submit job directly.
  if (! INSERT_SQL_PATTERN.matcher(statement.trim()).matches()) {
   return null;  }  // create pipeline

  final String jobName = sessionId + ": " + statement;

  final Pipeline pipeline;  try {   pipeline = context.createPipeline(jobName);  } catch (Throwable t) {   // catch everything such that the statement does not crash the executor

   throw new SqlExecutionException("Invalid SQL statement.", t);

  }  // create a copy so that we can change settings without affecting the original config  Configuration configuration = new Configuration(context.getFlinkConfig());

  // for update queries we don

't wait for the job result, so run in detached mode  configuration.set(DeploymentOptions.ATTACHED, false);   create execution  final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);   blocking deployment  try {   JobClient jobClient = deployer.deploy().get();   return ProgramTargetDescriptor.of(jobClient.getJobID());  } catch (Exception e) {   throw new RuntimeException("Error running SQL job.", e);  }context.createPipeline(jobName)public Pipeline createPipeline(String name) {  return wrapClassLoader(() -> {   if (streamExecEnv != null) {    StreamTableEnvironmentImpl streamTableEnv =  (StreamTableEnvironmentImpl) tableEnv;    return streamTableEnv.getPipeline(name);   } else {    BatchTableEnvironmentImpl batchTableEnv = (BatchTableEnvironmentImpl) tableEnv;    return batchTableEnv.getPipeline(name);   }  }); }

Detach mode commits

configuration.set(DeploymentOptions.ATTACHED, false );// create executionfinal ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline); 

org.apache.flink.table.client.gateway.local.ProgramDeployer is used to deploy a table program on a cluster.

Asynchronous submission of Flink Job

 public CompletableFuture deploy() {
LOG.info("Submitting job {} for query {}'" , pipeline, jobName);
  if (LOG.isDebugEnabled()) {
   LOG.debug("Submitting job {} with configuration: \n{}", pipeline, configuration);  }

  if (configuration.get(DeploymentOptions.TARGET) == null) {

   throw new RuntimeException("No execution.target specified in your configuration file." );  }  PipelineExecutorServiceLoader executorServiceLoader = DefaultExecutorServiceLoader.INSTANCE;  final PipelineExecutorFactory executorFactory;  try {   executorFactory = executorServiceLoader.getExecutorFactory(configuration);  } catch (Exception e) {

   throw new RuntimeException("Could not retrieve ExecutorFactory.", e);

  }  final PipelineExecutor executor = executorFactory.getExecutor(configuration);  CompletableFuture jobClient;  try {   jobClient = executor.execute(pipeline, configuration);  } catch (Exception e) {

   throw new RuntimeException("Could not execute program.", e);


  return jobClient;


Address: https://github.com/y0908105023/wiki/wiki/Flink-Sql-Client-%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90
author: y0908105023