Abstract
This article is based on Flink 1.12-SNAPSHOT, and uses the sql client command line to submit insert statements for the analysis of the whole process.
sql-client.sh embedded --update "INSERT INTO user_log_sink2 SELECT * FROM user_log"
Initialize The environment
main
class: org.apache.flink.table.client.SqlClient#
main
public static void main(String[] args) {
if (args.length < 1) { CliOptionsParser.printHelpClient(); return;
} switch (args[0]) { case MODE_EMBEDDED:
remove mode final String[] modeArgs = Arrays.copyOfRange(args, 1, args.length); final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs); if (options.isPrintHelp()) {
CliOptionsParser.printHelpEmbeddedModeClient(); } else {
try { final SqlClient client = new SqlClient(true, options);
client.start(); } catch (SqlClientException e) { // make space in terminal
System.out.println(); System.out.println(); LOG.error("SQL Client must stop.", e);
throw e; } catch (Throwable t) { // make space in terminal
System.out.println(); System.out.println(); LOG.error("SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.", t);
throw new SqlClientException("Unexpected exception. This is a bug. Please consider filing an issue.", t); } } break;
case MODE_GATEWAY:
throw new SqlClientException("Gateway mode is not supported yet."); default: CliOptionsParser.printHelpClient();
} } }
First judge the number of parameters, select the execution mode as EMBEDD or Gateway according to the first parameter, and this time it will enter EMBEDDED.
Next is to parse the command-line arguments.
Currently supported parameters are available at org.apache.flink.table.client.cli.CliOptionsParser
#parseEmbeddedModeClient org.apache.flink.table.client.cli.CliOptionsParserThe class is used to parse the command line.
Then create a SqlClient object based on the incoming arguments, call the start method
private void start() {
if (isEmbedded) { /
/ create local executor with default environment final List jars; if (options.getJars() != null) {
jars = options.getJars(); } else {
jars = Collections.emptyList(); } final List libDirs; if (options.getLibraryDirs() != null) {
libDirs = options.getLibraryDirs(); } else {
libDirs = Collections.emptyList(); } final Executor executor = new LocalExecutor(options.getDefaults(), jars, libDirs); executor.start(); create CLI client with session environment final Environment sessionEnv = readSessionEnvironment(options.getEnvironment()); appendPythonConfig(sessionEnv, options.getPythonConfiguration()); final SessionContext context; if (options.getSessionId() == null) {
context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv); } else {
context = new SessionContext(options.getSessionId(), sessionEnv); } // Open an new session String sessionId = executor.openSession(context); try { // add shutdown hook Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(sessionId, executor)); // do the actual work
openCli(sessionId, executor); } finally { executor.closeSession(sessionId); } } else {
throw new SqlClientException("Gateway mode is not supported yet.");
} }}
first instantiates the local executor according to the default sql-client-defaults.yaml configuration file and calls the start method of the instance, but no processing is done in this method.
Then read the session environment
to generate the SessionContext, note that the session environment here is actually the configuration file specified by the user through the -e parameter
Here is a brief introduction to the org.apache.flink.table.client.gateway.SessionContext class, which describes a session and is mainly used to open a new session on the backend. If a client requests to open a new session, the backend {@link Executor} maintains a {@link org.apache.flink.table.client.gateway.local.ExecutionContext} for it, and this session ID needs to be appended for each client interaction
Then pass the context object into the executor.openSession method to get the sessionId.
Then create a shutdown hook, the main work of this hook is to kill the submitted query job before closing the SQL client, to prevent the query job from running on the cluster and wasting resources.
private void cancelQueryInternal(ExecutionContext context, String resultId) { final DynamicResult result = resultStore.getResult(resultId); if (result == null) {
throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'." ); } // stop retrieval and remove the result LOG.info("Cancelling job {} and result retrieval.", resultId);
result.close(); resultStore.removeResult(resultId); stop Flink job try (final ClusterDescriptor clusterDescriptor = context.createClusterDescriptor()) { ClusterClient clusterClient = null; try { // retrieve existing cluster clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient(); try { clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get(); } catch (Throwable t) { // the job might has finished earlier } } catch (Exception e) { throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
} finally { try { if (clusterClient != null) {
clusterClient.close(); } } catch (Exception e) { // ignore } } } catch (SqlExecutionException e) { throw e; } catch (Exception e) { throw new SqlExecutionException("Could not locate a cluster.", e);
Finally
, the
sessionId and LocalExecutor objects are passed into the openCli method, after which it enters the actual working method.
Opens the CLI client for executing SQL statements.
/**
* Opens the CLI client for executing SQL statements. * * @param sessionId session identifier for the current client.
* @param executor executor */ private void openCli(String sessionId, Executor executor) { CliClient cli = null; try { Path historyFilePath; if (options.getHistoryFilePath() != null) {
historyFilePath = Paths.get(options.getHistoryFilePath()); } else {
historyFilePath = Paths.get(System.getProperty("user.home"),
SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history"); } cli = new CliClient(sessionId, executor, historyFilePath); interactive CLI mode if (options.getUpdateStatement() == null) {
cli.open(); } // execute single update statement else {
final boolean success = cli.submitUpdate(options.getUpdateStatement()); if (!success) {
throw new SqlClientException("Could not submit given SQL update statement to cluster."); } } } finally { if (cli != null) {
cli.close();
} } } }First
determine whether historyFilePath is specified in the command line argument, if it is not displayed, the .flink-sql-history under the HOME path of the current user will be used as historyFilePath
Here, since we directly pass in the SQL statement through the update parameter on the command line, it will not enter the interactive mode of the terminal, Instead, a single update statement is executed.
cli.submitUpdate(options.getUpdateStatement())
where options.getUpdateStatement()
is the
SQL statement we passed in the command, that is, INSERT INTO user_log_ sink2 SELECT * FROM user_log
Execute single update statement Execute the
submitUpdate method
/** * Submits a SQL update statement and prints status information and/or errors on the terminal. * * @param statement SQL update statement * @return flag to indicate if the submission was successful or not
*/ public boolean submitUpdate(String statement) { terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi()); terminal.writer().println(new AttributedString(statement).toString()); terminal.flush(); final Optional parsedStatement = parseCommand(statement); only support INSERT INTO/OVERWRITE return parsedStatement.map(cmdCall -> {
switch (cmdCall.command) { case INSERT_INTO:
case INSERT_OVERWRITE:
return callInsert(cmdCall); default: printError(CliStrings.MESSAGE_UNSUPPORTED_SQL); return false;
} }).orElse(false);
First
printed two lines of information
[INFO] Executing the following statement: INSERT INTO user_log_sink2 SELECT * FROM user_log
Parsing SQL Statement
then parses the incoming SQL statement
private Optional parseCommand(String line) { final Optional parsedLine = SqlCommandParser.parse(executor.getSqlParser(sessionId), line); if (!parsedLine.isPresent()) {
printError(CliStrings.MESSAGE_UNKNOWN_SQL); } return parsedLine;
First get the Parser object from executor.getSqlParser(sessionId).
@Override public Parser getSqlParser(String sessionId) { final ExecutionContext> context = getExecutionContext(sessionId); final TableEnvironment tableEnv = context.getTableEnvironment(); final Parser parser = ((TableEnvironmentInternal) tableEnv).getParser(); return new Parser() {
@Override public List parse(String statement) { return context.wrapClassLoader(() -> parser.parse(statement));
} @Override public UnresolvedIdentifier parseIdentifier(String identifier) { return context.wrapClassLoader(() -> parser.parseIdentifier(identifier));
} }; Pass
the
Parse object and SQL statement into the SqlCommandParser.parse method, org.apache.flink.table.client.cli.SqlCommandParser is a simple parser for determining the type of command and its arguments.
public static Optional parse(Parser sqlParser, String stmt) { // normalize stmt = stmt.trim(); // remove '; ' at the end
if (stmt.endsWith(";" )) { stmt = stmt.substring(0, stmt.length() - 1).trim(); } // parse statement via sql parser first Optional callOpt = parseBySqlParser(sqlParser, stmt); if (callOpt.isPresent()) {
return callOpt;
} else {
return parseByRegexMatching(stmt); } } private static Optional parseBySqlParser(Parser sqlParser, String stmt) { List operations; try { operations = sqlParser.parse(stmt); } catch (Throwable e) { if (e instanceof ValidationException) {
// can be parsed via sql parser, but is not validated. throw exception directly throw new SqlExecutionException("Invalidate SQL statement.", e);
} return Optional.empty();
} if (operations.size() != 1) {
throw new SqlExecutionException("Only single statement is supported now." ); } final SqlCommand cmd; String[] operands = new String[] { stmt }; Operation operation = operations.get(0); if (operation instanceof CatalogSinkModifyOperation) {
boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite(); cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO; } else if (operation instanceof CreateTableOperation) {
cmd = SqlCommand.CREATE_TABLE; } else if (operation instanceof DropTableOperation) {
cmd = SqlCommand.DROP_TABLE; } else if (operation instanceof AlterTableOperation) {
cmd = SqlCommand.ALTER_TABLE; } else if (operation instanceof CreateViewOperation) {
cmd = SqlCommand.CREATE_VIEW; CreateViewOperation op = (CreateViewOperation) operation; operands = new String[] { op.getViewIdentifier().asSerializableString(), op.getCatalogView().getOriginalQuery() }; } else if (operation instanceof DropViewOperation) {
cmd = SqlCommand.DROP_VIEW; operands = new String[] { ((DropViewOperation) operation).getViewIdentifier().asSerializableString() }; } else if (operation instanceof CreateDatabaseOperation) {
cmd = SqlCommand.CREATE_DATABASE; } else if (operation instanceof DropDatabaseOperation) {
cmd = SqlCommand.DROP_DATABASE; } else if (operation instanceof AlterDatabaseOperation) {
cmd = SqlCommand.ALTER_DATABASE; } else if (operation instanceof CreateCatalogOperation) {
cmd = SqlCommand.CREATE_CATALOG; } else if (operation instanceof DropCatalogOperation) {
cmd = SqlCommand.DROP_CATALOG; } else if (operation instanceof UseCatalogOperation) {
cmd = SqlCommand.USE_CATALOG; operands = new String[] { ((UseCatalogOperation) operation).getCatalogName() }; } else if (operation instanceof UseDatabaseOperation) {
cmd = SqlCommand.USE; operands = new String[] { ((UseDatabaseOperation) operation).getDatabaseName() }; } else if (operation instanceof ShowCatalogsOperation) {
cmd = SqlCommand.SHOW_CATALOGS; operands = new String[0]; } else if (operation instanceof ShowDatabasesOperation) {
cmd = SqlCommand.SHOW_DATABASES; operands = new String[0]; } else if (operation instanceof ShowTablesOperation) {
cmd = SqlCommand.SHOW_TABLES; operands = new String[0]; } else if (operation instanceof ShowFunctionsOperation) {
cmd = SqlCommand.SHOW_FUNCTIONS; operands = new String[0]; } else if (operation instanceof CreateCatalogFunctionOperation ||
operation instanceof CreateTempSystemFunctionOperation) { cmd = SqlCommand.CREATE_FUNCTION; } else if (operation instanceof DropCatalogFunctionOperation ||
operation instanceof DropTempSystemFunctionOperation) { cmd = SqlCommand.DROP_FUNCTION; } else if (operation instanceof AlterCatalogFunctionOperation) {
cmd = SqlCommand.ALTER_FUNCTION; } else if (operation instanceof ExplainOperation) {
cmd = SqlCommand.EXPLAIN; } else if (operation instanceof DescribeTableOperation) {
cmd = SqlCommand.DESCRIBE; operands = new String[] { ((DescribeTableOperation) operation).getSqlIdentifier().asSerializableString() }; } else if (operation instanceof QueryOperation) {
cmd = SqlCommand.SELECT; } else {
cmd = null; } return cmd == null ? Optional.empty() : Optional.of(new SqlCommandCall(cmd, operands));
}
finally returns to the call in org.apache.flink.table.client.cli.CliClient#submitUpdate
final Optional
Then execute
// only support INSERT INTO/OVERWRITE
return parsedStatement.map(cmdCall -> { switch (cmdCall.command) { case INSERT_INTO:
case INSERT_OVERWRITE:
return callInsert(cmdCall); default: printError(CliStrings.MESSAGE_UNSUPPORTED_SQL); return false;
} }).orElse(false);
Call Insert Method
into the callInsert method
private boolean callInsert(SqlCommandCall cmdCall) { printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT); try { final ProgramTargetDescriptor programTarget = executor.executeUpdate(sessionId, cmdCall.operands[0]); terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi()); terminal.writer().println(programTarget.toString()); terminal.flush(); } catch (SqlExecutionException e) { printExecutionException(e); return false;
} return true;
}
will first print a line of information
in the terminal [INFO] Submitting SQL update statement to the cluster...
Then execute the executor.executeUpdate(sessionId, cmdCall.operands[0])
method
@Override public ProgramTargetDescriptor executeUpdate(String sessionId, String statement) throws SqlExecutionException { final ExecutionContext> context = getExecutionContext( sessionId); return executeUpdateInternal(sessionId, context, statement);
Enter
the executeUpdateIntegral method
private ProgramTargetDescriptor executeUpdateInternal( String sessionId, ExecutionContext context, String statement) { applyUpdate(context, statement); Todo: we should refactor following condition after TableEnvironment has support submit job directly. if (! INSERT_SQL_PATTERN.matcher(statement.trim()).matches()) {
return null; } // create pipeline final String jobName = sessionId + ": " + statement;
final Pipeline pipeline; try { pipeline = context.createPipeline(jobName); } catch (Throwable t) { // catch everything such that the statement does not crash the executor throw new SqlExecutionException("Invalid SQL statement.", t);
} // create a copy so that we can change settings without affecting the original config Configuration configuration = new Configuration(context.getFlinkConfig()); // for update queries we don
't wait for the job result, so run in detached mode configuration.set(DeploymentOptions.ATTACHED, false); create execution final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline); blocking deployment try { JobClient jobClient = deployer.deploy().get(); return ProgramTargetDescriptor.of(jobClient.getJobID()); } catch (Exception e) { throw new RuntimeException("Error running SQL job.", e); } }Buffer
List
first goes into the applyUpdate(context, statement) method
/** * Applies the given update statement to the given table environment with query configuration. */ private void applyUpdate(ExecutionContext context, String updateStatement) { final TableEnvironment tableEnv = context.getTableEnvironment(); try { // TODO replace sqlUpdate with executeSql // This needs we do more refactor, because we can
't set the flinkConfig in ExecutionContext // into StreamExecutionEnvironment context.wrapClassLoader(() -> tableEnv.sqlUpdate(updateStatement)); } catch (Throwable t) { // catch everything such that the statement does not crash the executor throw new SqlExecutionException("Invalid SQL update statement.", t); } } Enter
the tableEnv.sqlUpdate(updateStatement) method
@Override public void sqlUpdate(String stmt) { List operations = parser.parse(stmt); if (operations.size() != 1) {
throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG); } Operation operation = operations.get(0); if (operation instanceof ModifyOperation) {
buffer(Collections.singletonList((ModifyOperation) operation)); } else if (operation instanceof CreateTableOperation ||
operation instanceof DropTableOperation || operation instanceof AlterTableOperation || operation instanceof CreateViewOperation || operation instanceof DropViewOperation || operation instanceof CreateDatabaseOperation || operation instanceof DropDatabaseOperation || operation instanceof AlterDatabaseOperation || operation instanceof CreateCatalogFunctionOperation || operation instanceof CreateTempSystemFunctionOperation || operation instanceof DropCatalogFunctionOperation || operation instanceof DropTempSystemFunctionOperation || operation instanceof AlterCatalogFunctionOperation || operation instanceof CreateCatalogOperation || operation instanceof DropCatalogOperation || operation instanceof UseCatalogOperation || operation instanceof UseDatabaseOperation) { executeOperation(operation); } else {
throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG); } }
parse(stmt) method eventually returns Collections.singletonList(operation)
@Override public List parse(String statement) { CalciteParser parser = calciteParserSupplier.get(); FlinkPlannerImpl planner = validatorSupplier.get(); parse the sql query SqlNode parsed = parser.parse(statement); Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed) .orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation); }
buffer(Collections.singletonList((ModifyOperation) operation)) adds the operation to a private
void buffer(List modifyOperations) { bufferedModifyOperations.addAll(modifyOperations); }
Create pipeline and blocking deployment
returns to org.apache.flink.table.client.gateway.local.LocalExecutor #executeUpdateInternal
//Todo: we should refactor following condition after TableEnvironment has support submit job directly.
if (! INSERT_SQL_PATTERN.matcher(statement.trim()).matches()) {
return null; } // create pipeline final String jobName = sessionId + ": " + statement;
final Pipeline pipeline; try { pipeline = context.createPipeline(jobName); } catch (Throwable t) { // catch everything such that the statement does not crash the executor throw new SqlExecutionException("Invalid SQL statement.", t);
} // create a copy so that we can change settings without affecting the original config Configuration configuration = new Configuration(context.getFlinkConfig()); // for update queries we don
't wait for the job result, so run in detached mode configuration.set(DeploymentOptions.ATTACHED, false); create execution final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline); blocking deployment try { JobClient jobClient = deployer.deploy().get(); return ProgramTargetDescriptor.of(jobClient.getJobID()); } catch (Exception e) { throw new RuntimeException("Error running SQL job.", e); }context.createPipeline(jobName)public Pipeline createPipeline(String name) { return wrapClassLoader(() -> { if (streamExecEnv != null) { StreamTableEnvironmentImpl streamTableEnv = (StreamTableEnvironmentImpl) tableEnv; return streamTableEnv.getPipeline(name); } else { BatchTableEnvironmentImpl batchTableEnv = (BatchTableEnvironmentImpl) tableEnv; return batchTableEnv.getPipeline(name); } }); }
Detach mode commits
configuration.set(DeploymentOptions.ATTACHED, false );// create executionfinal ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
org.apache.flink.table.client.gateway.local.ProgramDeployer is used to deploy a table program on a cluster.
Asynchronous submission of Flink Job
public CompletableFuture deploy() {
LOG.info("Submitting job {} for query {}'" , pipeline, jobName);
if (LOG.isDebugEnabled()) {
LOG.debug("Submitting job {} with configuration: \n{}", pipeline, configuration); } if (configuration.get(DeploymentOptions.TARGET) == null) {
throw new RuntimeException("No execution.target specified in your configuration file." ); } PipelineExecutorServiceLoader executorServiceLoader = DefaultExecutorServiceLoader.INSTANCE; final PipelineExecutorFactory executorFactory; try { executorFactory = executorServiceLoader.getExecutorFactory(configuration); } catch (Exception e) { throw new RuntimeException("Could not retrieve ExecutorFactory.", e);
} final PipelineExecutor executor = executorFactory.getExecutor(configuration); CompletableFuture jobClient; try { jobClient = executor.execute(pipeline, configuration); } catch (Exception e) { throw new RuntimeException("Could not execute program.", e);
} return jobClient;
}
Address: https://github.com/y0908105023/wiki/wiki/Flink-Sql-Client-%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90
author: y0908105023