| 1 |
|
%% MySQL/OTP – MySQL client library for Erlang/OTP |
| 2 |
|
%% Copyright (C) 2014-2018 Viktor Söderqvist |
| 3 |
|
%% |
| 4 |
|
%% This file is part of MySQL/OTP. |
| 5 |
|
%% |
| 6 |
|
%% MySQL/OTP is free software: you can redistribute it and/or modify it under |
| 7 |
|
%% the terms of the GNU Lesser General Public License as published by the Free |
| 8 |
|
%% Software Foundation, either version 3 of the License, or (at your option) |
| 9 |
|
%% any later version. |
| 10 |
|
%% |
| 11 |
|
%% This program is distributed in the hope that it will be useful, but WITHOUT |
| 12 |
|
%% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| 13 |
|
%% FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for |
| 14 |
|
%% more details. |
| 15 |
|
%% |
| 16 |
|
%% You should have received a copy of the GNU Lesser General Public License |
| 17 |
|
%% along with this program. If not, see <https://www.gnu.org/licenses/>. |
| 18 |
|
|
| 19 |
|
%% @doc This module implements parts of the MySQL client/server protocol. |
| 20 |
|
%% |
| 21 |
|
%% The protocol is described in the document "MySQL Internals" which can be |
| 22 |
|
%% found under "MySQL Documentation: Expert Guides" on http://dev.mysql.com/. |
| 23 |
|
%% |
| 24 |
|
%% TCP communication is not handled in this module. Most of the public functions |
| 25 |
|
%% take funs for data communitaction as parameters. |
| 26 |
|
%% @private |
| 27 |
|
-module(mysql_conn). |
| 28 |
|
|
| 29 |
|
-behaviour(gen_server). |
| 30 |
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, |
| 31 |
|
code_change/3]). |
| 32 |
|
|
| 33 |
|
-define(default_host, "localhost"). |
| 34 |
|
-define(default_port, 3306). |
| 35 |
|
-define(default_user, <<>>). |
| 36 |
|
-define(default_password, <<>>). |
| 37 |
|
-define(default_connect_timeout, 5000). |
| 38 |
|
-define(default_query_timeout, infinity). |
| 39 |
|
-define(default_query_cache_time, 60000). %% for query/3. |
| 40 |
|
-define(default_ping_timeout, 60000). |
| 41 |
|
|
| 42 |
|
-define(cmd_timeout, 3000). %% Timeout used for various commands to the server |
| 43 |
|
|
| 44 |
|
%% Errors that cause "implicit rollback" |
| 45 |
|
-define(ERROR_DEADLOCK, 1213). |
| 46 |
|
|
| 47 |
|
%% --- Gen_server callbacks --- |
| 48 |
|
|
| 49 |
|
-include("records.hrl"). |
| 50 |
|
-include("server_status.hrl"). |
| 51 |
|
|
| 52 |
|
%% Gen_server state |
| 53 |
|
-record(state, {server_version, connection_id, socket, sockmod, tcp_opts, ssl_opts, |
| 54 |
|
host, port, user, password, database, queries, prepares, |
| 55 |
|
auth_plugin_name, auth_plugin_data, allowed_local_paths, |
| 56 |
|
log_warnings, log_slow_queries, |
| 57 |
|
connect_timeout, ping_timeout, query_timeout, query_cache_time, |
| 58 |
|
affected_rows = 0, status = 0, warning_count = 0, insert_id = 0, |
| 59 |
|
transaction_levels = [], ping_ref = undefined, |
| 60 |
|
stmts = dict:new(), query_cache = empty, cap_found_rows = false}). |
| 61 |
|
|
| 62 |
|
%% @private |
| 63 |
|
init(Opts) -> |
| 64 |
|
%% Connect |
| 65 |
40 |
Host = proplists:get_value(host, Opts, ?default_host), |
| 66 |
|
|
| 67 |
40 |
DefaultPort = case Host of |
| 68 |
1 |
{local, _LocalAddr} -> 0; |
| 69 |
39 |
_NonLocalAddr -> ?default_port |
| 70 |
|
end, |
| 71 |
40 |
Port = proplists:get_value(port, Opts, DefaultPort), |
| 72 |
|
|
| 73 |
40 |
User = proplists:get_value(user, Opts, ?default_user), |
| 74 |
40 |
Password = proplists:get_value(password, Opts, ?default_password), |
| 75 |
40 |
Database = proplists:get_value(database, Opts, undefined), |
| 76 |
40 |
AllowedLocalPaths = proplists:get_value(allowed_local_paths, Opts, []), |
| 77 |
40 |
LogWarn = proplists:get_value(log_warnings, Opts, true), |
| 78 |
40 |
LogSlow = proplists:get_value(log_slow_queries, Opts, false), |
| 79 |
40 |
KeepAlive = proplists:get_value(keep_alive, Opts, false), |
| 80 |
40 |
ConnectTimeout = proplists:get_value(connect_timeout, Opts, |
| 81 |
|
?default_connect_timeout), |
| 82 |
40 |
QueryTimeout = proplists:get_value(query_timeout, Opts, |
| 83 |
|
?default_query_timeout), |
| 84 |
40 |
QueryCacheTime = proplists:get_value(query_cache_time, Opts, |
| 85 |
|
?default_query_cache_time), |
| 86 |
40 |
TcpOpts = proplists:get_value(tcp_options, Opts, []), |
| 87 |
40 |
SetFoundRows = proplists:get_value(found_rows, Opts, false), |
| 88 |
40 |
SSLOpts = proplists:get_value(ssl, Opts, undefined), |
| 89 |
|
|
| 90 |
40 |
Queries = proplists:get_value(queries, Opts, []), |
| 91 |
40 |
Prepares = proplists:get_value(prepare, Opts, []), |
| 92 |
|
|
| 93 |
40 |
true = lists:all(fun mysql_protocol:valid_path/1, AllowedLocalPaths), |
| 94 |
|
|
| 95 |
40 |
PingTimeout = case KeepAlive of |
| 96 |
4 |
true -> ?default_ping_timeout; |
| 97 |
35 |
false -> infinity; |
| 98 |
1 |
N when N > 0 -> N |
| 99 |
|
end, |
| 100 |
|
|
| 101 |
40 |
State0 = #state{ |
| 102 |
|
tcp_opts = TcpOpts, |
| 103 |
|
ssl_opts = SSLOpts, |
| 104 |
|
host = Host, port = Port, |
| 105 |
|
user = User, password = Password, |
| 106 |
|
database = Database, |
| 107 |
|
allowed_local_paths = AllowedLocalPaths, |
| 108 |
|
queries = Queries, prepares = Prepares, |
| 109 |
|
log_warnings = LogWarn, log_slow_queries = LogSlow, |
| 110 |
|
connect_timeout = ConnectTimeout, |
| 111 |
|
ping_timeout = PingTimeout, |
| 112 |
|
query_timeout = QueryTimeout, |
| 113 |
|
query_cache_time = QueryCacheTime, |
| 114 |
|
cap_found_rows = (SetFoundRows =:= true) |
| 115 |
|
}, |
| 116 |
|
|
| 117 |
40 |
case proplists:get_value(connect_mode, Opts, synchronous) of |
| 118 |
|
synchronous -> |
| 119 |
37 |
case connect(State0) of |
| 120 |
|
{ok, State1} -> |
| 121 |
34 |
{ok, State1}; |
| 122 |
|
{error, Reason} -> |
| 123 |
3 |
{stop, Reason} |
| 124 |
|
end; |
| 125 |
|
asynchronous -> |
| 126 |
2 |
gen_server:cast(self(), connect), |
| 127 |
2 |
{ok, State0}; |
| 128 |
|
lazy -> |
| 129 |
1 |
{ok, State0} |
| 130 |
|
end. |
| 131 |
|
|
| 132 |
|
connect(#state{connect_timeout = ConnectTimeout} = State) -> |
| 133 |
40 |
MainPid = self(), |
| 134 |
40 |
Pid = spawn_link( |
| 135 |
|
fun () -> |
| 136 |
40 |
{ok, State1}=connect_socket(State), |
| 137 |
40 |
case handshake(State1) of |
| 138 |
|
{ok, #state{sockmod = SockMod, socket = Socket} = State2} -> |
| 139 |
38 |
SockMod:controlling_process(Socket, MainPid), |
| 140 |
38 |
MainPid ! {self(), {ok, State2}}; |
| 141 |
|
{error, _} = E -> |
| 142 |
2 |
MainPid ! {self(), E} |
| 143 |
|
end |
| 144 |
|
end |
| 145 |
|
), |
| 146 |
40 |
receive |
| 147 |
|
{Pid, {ok, State3}} -> |
| 148 |
38 |
post_connect(State3); |
| 149 |
|
{Pid, {error, _} = E} -> |
| 150 |
2 |
E |
| 151 |
|
after ConnectTimeout -> |
| 152 |
:-( |
unlink(Pid), |
| 153 |
:-( |
exit(Pid, kill), |
| 154 |
:-( |
{error, timeout} |
| 155 |
|
end. |
| 156 |
|
|
| 157 |
|
connect_socket(#state{tcp_opts = TcpOpts, host = Host, port = Port} = State) -> |
| 158 |
|
%% Connect socket |
| 159 |
40 |
SockOpts = sanitize_tcp_opts(TcpOpts), |
| 160 |
40 |
{ok, Socket} = gen_tcp:connect(Host, Port, SockOpts), |
| 161 |
|
|
| 162 |
|
%% If buffer wasn't specifically defined make it at least as |
| 163 |
|
%% large as recbuf, as suggested by the inet:setopts() docs. |
| 164 |
40 |
case proplists:is_defined(buffer, TcpOpts) of |
| 165 |
|
true -> |
| 166 |
:-( |
ok; |
| 167 |
|
false -> |
| 168 |
40 |
{ok, [{buffer, Buffer}]} = inet:getopts(Socket, [buffer]), |
| 169 |
40 |
{ok, [{recbuf, Recbuf}]} = inet:getopts(Socket, [recbuf]), |
| 170 |
40 |
ok = inet:setopts(Socket, [{buffer, max(Buffer, Recbuf)}]) |
| 171 |
|
end, |
| 172 |
|
|
| 173 |
40 |
{ok, State#state{socket = Socket}}. |
| 174 |
|
|
| 175 |
|
sanitize_tcp_opts([{inet_backend, _} = InetBackend | TcpOpts0]) -> |
| 176 |
|
%% This option is be used to turn on the experimental socket backend for |
| 177 |
|
%% gen_tcp/inet (OTP/23). If given, it must remain the first option in the |
| 178 |
|
%% list. |
| 179 |
:-( |
[InetBackend | sanitize_tcp_opts(TcpOpts0)]; |
| 180 |
|
sanitize_tcp_opts(TcpOpts0) -> |
| 181 |
40 |
TcpOpts1 = lists:filter( |
| 182 |
|
fun |
| 183 |
:-( |
({mode, _}) -> false; |
| 184 |
:-( |
(binary) -> false; |
| 185 |
:-( |
(list) -> false; |
| 186 |
:-( |
({packet, _}) -> false; |
| 187 |
:-( |
({active, _}) -> false; |
| 188 |
:-( |
(_) -> true |
| 189 |
|
end, |
| 190 |
|
TcpOpts0 |
| 191 |
|
), |
| 192 |
40 |
TcpOpts2 = case lists:keymember(nodelay, 1, TcpOpts1) of |
| 193 |
:-( |
true -> TcpOpts1; |
| 194 |
40 |
false -> [{nodelay, true} | TcpOpts1] |
| 195 |
|
end, |
| 196 |
40 |
[binary, {packet, raw}, {active, false} | TcpOpts2]. |
| 197 |
|
|
| 198 |
|
handshake(#state{socket = Socket0, ssl_opts = SSLOpts, |
| 199 |
|
host = Host, user = User, password = Password, database = Database, |
| 200 |
|
cap_found_rows = SetFoundRows} = State0) -> |
| 201 |
|
%% Exchange handshake communication. |
| 202 |
40 |
Result = mysql_protocol:handshake(Host, User, Password, Database, gen_tcp, |
| 203 |
|
SSLOpts, Socket0, SetFoundRows), |
| 204 |
40 |
case Result of |
| 205 |
|
{ok, Handshake, SockMod, Socket} -> |
| 206 |
38 |
setopts(SockMod, Socket, [{active, once}]), |
| 207 |
|
#handshake{server_version = Version, connection_id = ConnId, |
| 208 |
|
status = Status, |
| 209 |
|
auth_plugin_name = AuthPluginName, |
| 210 |
38 |
auth_plugin_data = AuthPluginData} = Handshake, |
| 211 |
38 |
State1 = State0#state{server_version = Version, connection_id = ConnId, |
| 212 |
|
sockmod = SockMod, |
| 213 |
|
socket = Socket, |
| 214 |
|
auth_plugin_name = AuthPluginName, |
| 215 |
|
auth_plugin_data = AuthPluginData, |
| 216 |
|
status = Status}, |
| 217 |
38 |
{ok, State1}; |
| 218 |
|
#error{} = E -> |
| 219 |
2 |
{error, error_to_reason(E)} |
| 220 |
|
end. |
| 221 |
|
|
| 222 |
|
post_connect(#state{queries = Queries, prepares = Prepares} = State) -> |
| 223 |
46 |
case execute_on_connect(Queries, Prepares, State) of |
| 224 |
|
{ok, State1} -> |
| 225 |
42 |
process_flag(trap_exit, true), |
| 226 |
42 |
State2 = schedule_ping(State1), |
| 227 |
42 |
{ok, State2}; |
| 228 |
|
{error, _} = E -> |
| 229 |
4 |
E |
| 230 |
|
end. |
| 231 |
|
|
| 232 |
|
execute_on_connect([], [], State) -> |
| 233 |
42 |
{ok, State}; |
| 234 |
|
execute_on_connect([], [{Name, Stmt}|Prepares], State) -> |
| 235 |
5 |
case named_prepare(Name, Stmt, State) of |
| 236 |
|
{{ok, Name}, State1} -> |
| 237 |
3 |
execute_on_connect([], Prepares, State1); |
| 238 |
|
{{error, _} = E, _} -> |
| 239 |
2 |
E |
| 240 |
|
end; |
| 241 |
|
execute_on_connect([Query|Queries], Prepares, State) -> |
| 242 |
9 |
case query(Query, no_filtermap_fun, default_timeout, State) of |
| 243 |
|
{ok, State1} -> |
| 244 |
3 |
execute_on_connect(Queries, Prepares, State1); |
| 245 |
|
{{ok, _}, State1} -> |
| 246 |
2 |
execute_on_connect(Queries, Prepares, State1); |
| 247 |
|
{{ok, _, _}, State1} -> |
| 248 |
2 |
execute_on_connect(Queries, Prepares, State1); |
| 249 |
|
{{error, _} = E, _} -> |
| 250 |
2 |
E |
| 251 |
|
end. |
| 252 |
|
|
| 253 |
|
%% @private |
| 254 |
|
%% @doc |
| 255 |
|
%% |
| 256 |
|
%% Query and execute calls: |
| 257 |
|
%% |
| 258 |
|
%% <ul> |
| 259 |
|
%% <li>{query, Query, FilterMap, Timeout}</li> |
| 260 |
|
%% <li>{param_query, Query, Params, FilterMap, Timeout}</li> |
| 261 |
|
%% <li>{execute, Stmt, Args, FilterMap, Timeout}</li> |
| 262 |
|
%% </ul> |
| 263 |
|
%% |
| 264 |
|
%% For the calls listed above, we return these values: |
| 265 |
|
%% |
| 266 |
|
%% <dl> |
| 267 |
|
%% <dt>`ok'</dt> |
| 268 |
|
%% <dd>Success without returning any table data (UPDATE, etc.)</dd> |
| 269 |
|
%% <dt>`{ok, ColumnNames, Rows}'</dt> |
| 270 |
|
%% <dd>Queries returning one result set of table data</dd> |
| 271 |
|
%% <dt>`{ok, [{ColumnNames, Rows}, ...]}'</dt> |
| 272 |
|
%% <dd>Queries returning more than one result set of table data</dd> |
| 273 |
|
%% <dt>`{error, ServerReason}'</dt> |
| 274 |
|
%% <dd>MySQL server error</dd> |
| 275 |
|
%% <dt>`{implicit_commit, NestingLevel, Query}'</dt> |
| 276 |
|
%% <dd>A DDL statement (e.g. CREATE TABLE, ALTER TABLE, etc.) results in |
| 277 |
|
%% an implicit commit. |
| 278 |
|
%% |
| 279 |
|
%% If the caller is in a (nested) transaction, it must be aborted. To be |
| 280 |
|
%% able to handle this in the caller's process, we also return the |
| 281 |
|
%% nesting level.</dd> |
| 282 |
|
%% <dt>`{implicit_rollback, NestingLevel, ServerReason}'</dt> |
| 283 |
|
%% <dd>This errors results in an implicit rollback: `{1213, <<"40001">>, |
| 284 |
|
%% <<"Deadlock found when trying to get lock; try restarting " |
| 285 |
|
%% "transaction">>}'. |
| 286 |
|
%% |
| 287 |
|
%% If the caller is in a (nested) transaction, it must be aborted. To be |
| 288 |
|
%% able to handle this in the caller's process, we also return the |
| 289 |
|
%% nesting level.</dd> |
| 290 |
|
%% </dl> |
| 291 |
|
handle_call(is_connected, _, #state{socket = Socket} = State) -> |
| 292 |
4 |
{reply, Socket =/= undefined, State}; |
| 293 |
|
handle_call(Msg, From, #state{socket = undefined} = State) -> |
| 294 |
1 |
case connect(State) of |
| 295 |
|
{ok, State1} -> |
| 296 |
1 |
handle_call(Msg, From, State1); |
| 297 |
|
{error, _} = E -> |
| 298 |
:-( |
{stop, E, State} |
| 299 |
|
end; |
| 300 |
|
handle_call({query, Query, FilterMap, Timeout}, _From, State) -> |
| 301 |
450 |
{Reply, State1} = query(Query, FilterMap, Timeout, State), |
| 302 |
450 |
{reply, Reply, State1}; |
| 303 |
|
handle_call({param_query, Query, Params, FilterMap, default_timeout}, From, |
| 304 |
|
State) -> |
| 305 |
12 |
handle_call({param_query, Query, Params, FilterMap, |
| 306 |
|
State#state.query_timeout}, From, State); |
| 307 |
|
handle_call({param_query, Query, Params, FilterMap, Timeout}, _From, |
| 308 |
|
#state{socket = Socket, sockmod = SockMod} = State) -> |
| 309 |
|
%% Parametrized query: Prepared statement cached with the query as the key |
| 310 |
13 |
QueryBin = iolist_to_binary(Query), |
| 311 |
13 |
Cache = State#state.query_cache, |
| 312 |
13 |
{StmtResult, Cache1} = case mysql_cache:lookup(QueryBin, Cache) of |
| 313 |
|
{found, FoundStmt, NewCache} -> |
| 314 |
|
%% Found |
| 315 |
2 |
{{ok, FoundStmt}, NewCache}; |
| 316 |
|
not_found -> |
| 317 |
|
%% Prepare |
| 318 |
11 |
setopts(SockMod, Socket, [{active, false}]), |
| 319 |
11 |
Rec = mysql_protocol:prepare(Query, SockMod, Socket), |
| 320 |
11 |
setopts(SockMod, Socket, [{active, once}]), |
| 321 |
11 |
case Rec of |
| 322 |
|
#error{} = E -> |
| 323 |
1 |
{{error, error_to_reason(E)}, Cache}; |
| 324 |
|
#prepared{} = Stmt -> |
| 325 |
|
%% If the first entry in the cache, start the timer. |
| 326 |
10 |
Cache == empty andalso begin |
| 327 |
6 |
When = State#state.query_cache_time * 2, |
| 328 |
6 |
erlang:send_after(When, self(), query_cache) |
| 329 |
|
end, |
| 330 |
10 |
{{ok, Stmt}, mysql_cache:store(QueryBin, Stmt, Cache)} |
| 331 |
|
end |
| 332 |
|
end, |
| 333 |
13 |
case StmtResult of |
| 334 |
|
{ok, StmtRec} -> |
| 335 |
12 |
State1 = State#state{query_cache = Cache1}, |
| 336 |
12 |
{Reply, State2} = execute_stmt(StmtRec, Params, FilterMap, Timeout, State1), |
| 337 |
12 |
{reply, Reply, State2}; |
| 338 |
|
PrepareError -> |
| 339 |
1 |
{reply, PrepareError, State} |
| 340 |
|
end; |
| 341 |
|
handle_call({execute, Stmt, Args, FilterMap, default_timeout}, From, State) -> |
| 342 |
186 |
handle_call({execute, Stmt, Args, FilterMap, State#state.query_timeout}, |
| 343 |
|
From, State); |
| 344 |
|
handle_call({execute, Stmt, Args, FilterMap, Timeout}, _From, State) -> |
| 345 |
187 |
case dict:find(Stmt, State#state.stmts) of |
| 346 |
|
{ok, StmtRec} -> |
| 347 |
186 |
{Reply, State1} = execute_stmt(StmtRec, Args, FilterMap, Timeout, State), |
| 348 |
186 |
{reply, Reply, State1}; |
| 349 |
|
error -> |
| 350 |
1 |
{reply, {error, not_prepared}, State} |
| 351 |
|
end; |
| 352 |
|
handle_call({prepare, Query}, _From, State) -> |
| 353 |
93 |
#state{socket = Socket, sockmod = SockMod} = State, |
| 354 |
93 |
setopts(SockMod, Socket, [{active, false}]), |
| 355 |
93 |
Rec = mysql_protocol:prepare(Query, SockMod, Socket), |
| 356 |
93 |
setopts(SockMod, Socket, [{active, once}]), |
| 357 |
93 |
State1 = update_state(Rec, State), |
| 358 |
93 |
case Rec of |
| 359 |
|
#error{} = E -> |
| 360 |
1 |
{reply, {error, error_to_reason(E)}, State1}; |
| 361 |
|
#prepared{statement_id = Id} = Stmt -> |
| 362 |
92 |
Stmts1 = dict:store(Id, Stmt, State1#state.stmts), |
| 363 |
92 |
State2 = State#state{stmts = Stmts1}, |
| 364 |
92 |
{reply, {ok, Id}, State2} |
| 365 |
|
end; |
| 366 |
|
handle_call({prepare, Name, Query}, _From, State) when is_atom(Name) -> |
| 367 |
7 |
{Reply, State1} = named_prepare(Name, Query, State), |
| 368 |
7 |
{reply, Reply, State1}; |
| 369 |
|
handle_call({unprepare, Stmt}, _From, State) when is_atom(Stmt); |
| 370 |
|
is_integer(Stmt) -> |
| 371 |
91 |
case dict:find(Stmt, State#state.stmts) of |
| 372 |
|
{ok, StmtRec} -> |
| 373 |
89 |
#state{socket = Socket, sockmod = SockMod} = State, |
| 374 |
89 |
setopts(SockMod, Socket, [{active, false}]), |
| 375 |
89 |
mysql_protocol:unprepare(StmtRec, SockMod, Socket), |
| 376 |
89 |
setopts(SockMod, Socket, [{active, once}]), |
| 377 |
89 |
State1 = State#state{stmts = dict:erase(Stmt, State#state.stmts)}, |
| 378 |
89 |
State2 = schedule_ping(State1), |
| 379 |
89 |
{reply, ok, State2}; |
| 380 |
|
error -> |
| 381 |
2 |
{reply, {error, not_prepared}, State} |
| 382 |
|
end; |
| 383 |
|
handle_call({change_user, Username, Password, Options}, From, |
| 384 |
|
State = #state{transaction_levels = []}) -> |
| 385 |
|
#state{socket = Socket, sockmod = SockMod, |
| 386 |
|
auth_plugin_name = AuthPluginName, |
| 387 |
|
auth_plugin_data = AuthPluginData, |
| 388 |
9 |
server_version = ServerVersion} = State, |
| 389 |
9 |
Database = proplists:get_value(database, Options, undefined), |
| 390 |
9 |
Queries = proplists:get_value(queries, Options, []), |
| 391 |
9 |
Prepares = proplists:get_value(prepare, Options, []), |
| 392 |
9 |
setopts(SockMod, Socket, [{active, false}]), |
| 393 |
9 |
Result = mysql_protocol:change_user(SockMod, Socket, Username, Password, |
| 394 |
|
AuthPluginName, AuthPluginData, Database, |
| 395 |
|
ServerVersion), |
| 396 |
9 |
setopts(SockMod, Socket, [{active, once}]), |
| 397 |
9 |
State1 = update_state(Result, State), |
| 398 |
9 |
State1#state.warning_count > 0 andalso State1#state.log_warnings |
| 399 |
:-( |
andalso log_warnings(State1, "CHANGE USER"), |
| 400 |
9 |
State2 = State1#state{query_cache = empty, stmts = dict:new()}, |
| 401 |
9 |
case Result of |
| 402 |
|
#ok{} -> |
| 403 |
8 |
State3 = State2#state{user = Username, password = Password, |
| 404 |
|
database=Database, queries=Queries, |
| 405 |
|
prepares=Prepares}, |
| 406 |
8 |
case post_connect(State3) of |
| 407 |
|
{ok, State4} -> |
| 408 |
6 |
{reply, ok, State4}; |
| 409 |
|
{error, Reason} = E -> |
| 410 |
2 |
gen_server:reply(From, E), |
| 411 |
2 |
stop_server(Reason, State3) |
| 412 |
|
end; |
| 413 |
|
#error{} = E -> |
| 414 |
1 |
gen_server:reply(From, {error, error_to_reason(E)}), |
| 415 |
1 |
stop_server(change_user_failed, State2) |
| 416 |
|
end; |
| 417 |
|
handle_call(reset_connection, _From, #state{socket = Socket, sockmod = SockMod} = State) -> |
| 418 |
1 |
setopts(SockMod, Socket, [{active, false}]), |
| 419 |
1 |
Result = mysql_protocol:reset_connnection(SockMod, Socket), |
| 420 |
1 |
setopts(SockMod, Socket, [{active, once}]), |
| 421 |
1 |
State1 = update_state(Result, State), |
| 422 |
1 |
Reply = case Result of |
| 423 |
:-( |
#ok{} -> ok; |
| 424 |
|
#error{} = E -> |
| 425 |
|
%% 'COM_RESET_CONNECTION' is added in MySQL 5.7 and MariaDB 10 |
| 426 |
|
%% "Unkown command" is returned when MySQL =< 5.6 or MariaDB =< 5.5 |
| 427 |
1 |
{error, error_to_reason(E)} |
| 428 |
|
end, |
| 429 |
1 |
{reply, Reply, State1}; |
| 430 |
|
|
| 431 |
|
handle_call(warning_count, _From, State) -> |
| 432 |
2 |
{reply, State#state.warning_count, State}; |
| 433 |
|
handle_call(insert_id, _From, State) -> |
| 434 |
3 |
{reply, State#state.insert_id, State}; |
| 435 |
|
handle_call(affected_rows, _From, State) -> |
| 436 |
3 |
{reply, State#state.affected_rows, State}; |
| 437 |
|
handle_call(autocommit, _From, State) -> |
| 438 |
3 |
{reply, State#state.status band ?SERVER_STATUS_AUTOCOMMIT /= 0, State}; |
| 439 |
|
handle_call(backslash_escapes_enabled, _From, State = #state{status = S}) -> |
| 440 |
2 |
{reply, S band ?SERVER_STATUS_NO_BACKSLASH_ESCAPES == 0, State}; |
| 441 |
|
handle_call(in_transaction, _From, State) -> |
| 442 |
25 |
{reply, State#state.status band ?SERVER_STATUS_IN_TRANS /= 0, State}; |
| 443 |
|
handle_call(start_transaction, {FromPid, _}, |
| 444 |
|
State = #state{socket = Socket, sockmod = SockMod, |
| 445 |
|
transaction_levels = L, status = Status}) |
| 446 |
|
when Status band ?SERVER_STATUS_IN_TRANS == 0, L == []; |
| 447 |
|
Status band ?SERVER_STATUS_IN_TRANS /= 0, L /= [] -> |
| 448 |
28 |
MRef = erlang:monitor(process, FromPid), |
| 449 |
28 |
Query = case L of |
| 450 |
17 |
[] -> <<"BEGIN">>; |
| 451 |
11 |
_ -> <<"SAVEPOINT s", (integer_to_binary(length(L)))/binary>> |
| 452 |
|
end, |
| 453 |
28 |
setopts(SockMod, Socket, [{active, false}]), |
| 454 |
28 |
{ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket, |
| 455 |
|
[], no_filtermap_fun, |
| 456 |
|
?cmd_timeout), |
| 457 |
28 |
setopts(SockMod, Socket, [{active, once}]), |
| 458 |
28 |
State1 = update_state(Res, State), |
| 459 |
28 |
{reply, ok, State1#state{transaction_levels = [{FromPid, MRef} | L]}}; |
| 460 |
|
handle_call(rollback, {FromPid, _}, |
| 461 |
|
State = #state{socket = Socket, sockmod = SockMod, status = Status, |
| 462 |
|
transaction_levels = [{FromPid, MRef} | L]}) |
| 463 |
|
when Status band ?SERVER_STATUS_IN_TRANS /= 0 -> |
| 464 |
6 |
erlang:demonitor(MRef), |
| 465 |
6 |
Query = case L of |
| 466 |
5 |
[] -> <<"ROLLBACK">>; |
| 467 |
1 |
_ -> <<"ROLLBACK TO s", (integer_to_binary(length(L)))/binary>> |
| 468 |
|
end, |
| 469 |
6 |
setopts(SockMod, Socket, [{active, false}]), |
| 470 |
6 |
{ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket, |
| 471 |
|
[], no_filtermap_fun, |
| 472 |
|
?cmd_timeout), |
| 473 |
6 |
setopts(SockMod, Socket, [{active, once}]), |
| 474 |
6 |
State1 = update_state(Res, State), |
| 475 |
6 |
{reply, ok, State1#state{transaction_levels = L}}; |
| 476 |
|
handle_call(commit, {FromPid, _}, |
| 477 |
|
State = #state{socket = Socket, sockmod = SockMod, status = Status, |
| 478 |
|
transaction_levels = [{FromPid, MRef} | L]}) |
| 479 |
|
when Status band ?SERVER_STATUS_IN_TRANS /= 0 -> |
| 480 |
16 |
erlang:demonitor(MRef), |
| 481 |
16 |
Query = case L of |
| 482 |
10 |
[] -> <<"COMMIT">>; |
| 483 |
6 |
_ -> <<"RELEASE SAVEPOINT s", (integer_to_binary(length(L)))/binary>> |
| 484 |
|
end, |
| 485 |
16 |
setopts(SockMod, Socket, [{active, false}]), |
| 486 |
16 |
{ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket, |
| 487 |
|
[], no_filtermap_fun, |
| 488 |
|
?cmd_timeout), |
| 489 |
16 |
setopts(SockMod, Socket, [{active, once}]), |
| 490 |
16 |
State1 = update_state(Res, State), |
| 491 |
16 |
{reply, ok, State1#state{transaction_levels = L}}. |
| 492 |
|
|
| 493 |
|
%% @private |
| 494 |
|
handle_cast(connect, #state{socket = undefined} = State) -> |
| 495 |
2 |
case connect(State) of |
| 496 |
|
{ok, State1} -> |
| 497 |
1 |
{noreply, State1}; |
| 498 |
|
{error, _} = E -> |
| 499 |
1 |
{stop, E, State} |
| 500 |
|
end; |
| 501 |
|
handle_cast(connect, State) -> |
| 502 |
:-( |
{noreply, State}; |
| 503 |
|
handle_cast(_Msg, State) -> |
| 504 |
1 |
{noreply, State}. |
| 505 |
|
|
| 506 |
|
%% @private |
| 507 |
|
handle_info(query_cache, #state{query_cache = Cache, |
| 508 |
|
query_cache_time = CacheTime} = State) -> |
| 509 |
|
%% Evict expired queries/statements in the cache used by query/3. |
| 510 |
1 |
{Evicted, Cache1} = mysql_cache:evict_older_than(Cache, CacheTime), |
| 511 |
|
%% Unprepare the evicted statements |
| 512 |
1 |
#state{socket = Socket, sockmod = SockMod} = State, |
| 513 |
1 |
setopts(SockMod, Socket, [{active, false}]), |
| 514 |
1 |
lists:foreach(fun ({_Query, Stmt}) -> |
| 515 |
1 |
mysql_protocol:unprepare(Stmt, SockMod, Socket) |
| 516 |
|
end, |
| 517 |
|
Evicted), |
| 518 |
1 |
setopts(SockMod, Socket, [{active, once}]), |
| 519 |
|
%% If nonempty, schedule eviction again. |
| 520 |
1 |
mysql_cache:size(Cache1) > 0 andalso |
| 521 |
:-( |
erlang:send_after(CacheTime, self(), query_cache), |
| 522 |
1 |
{noreply, State#state{query_cache = Cache1}}; |
| 523 |
|
handle_info({'DOWN', _MRef, _, Pid, _Info}, State) -> |
| 524 |
1 |
stop_server({application_process_died, Pid}, State); |
| 525 |
|
handle_info(ping, #state{socket = Socket, sockmod = SockMod} = State) -> |
| 526 |
4 |
setopts(SockMod, Socket, [{active, false}]), |
| 527 |
4 |
#ok{} = mysql_protocol:ping(SockMod, Socket), |
| 528 |
3 |
setopts(SockMod, Socket, [{active, once}]), |
| 529 |
3 |
{noreply, schedule_ping(State)}; |
| 530 |
|
handle_info({tcp_closed, _Socket}, State) -> |
| 531 |
1 |
{stop, normal, State#state{socket = undefined, connection_id = undefined}}; |
| 532 |
|
handle_info({tcp_error, _Socket, Reason}, State) -> |
| 533 |
1 |
stop_server({tcp_error, Reason}, State); |
| 534 |
|
handle_info(_Info, State) -> |
| 535 |
2 |
{noreply, State}. |
| 536 |
|
|
| 537 |
|
%% @private |
| 538 |
|
terminate(Reason, #state{socket = Socket, sockmod = SockMod}) |
| 539 |
|
when Socket =/= undefined andalso (Reason == normal orelse Reason == shutdown) -> |
| 540 |
|
%% Send the goodbye message for politeness. |
| 541 |
29 |
setopts(SockMod, Socket, [{active, false}]), |
| 542 |
29 |
mysql_protocol:quit(SockMod, Socket); |
| 543 |
|
terminate(_Reason, _State) -> |
| 544 |
9 |
ok. |
| 545 |
|
|
| 546 |
|
%% @private |
| 547 |
|
code_change(_OldVsn, State = #state{}, _Extra) -> |
| 548 |
1 |
{ok, State}; |
| 549 |
|
code_change(_OldVsn, _State, _Extra) -> |
| 550 |
1 |
{error, incompatible_state}. |
| 551 |
|
|
| 552 |
|
%% --- Helpers --- |
| 553 |
|
|
| 554 |
|
%% @doc Executes a prepared statement and returns {Reply, NewState}. |
| 555 |
|
execute_stmt(Stmt, Args, FilterMap, Timeout, State) -> |
| 556 |
|
#state{socket = Socket, sockmod = SockMod, |
| 557 |
198 |
allowed_local_paths = AllowedPaths} = State, |
| 558 |
198 |
setopts(SockMod, Socket, [{active, false}]), |
| 559 |
198 |
{ok, Recs} = case mysql_protocol:execute(Stmt, Args, SockMod, Socket, |
| 560 |
|
AllowedPaths, FilterMap, |
| 561 |
|
Timeout) of |
| 562 |
|
{error, timeout} when State#state.server_version >= [5, 0, 0] -> |
| 563 |
2 |
kill_query(State), |
| 564 |
2 |
mysql_protocol:fetch_execute_response(SockMod, Socket, |
| 565 |
|
[], FilterMap, ?cmd_timeout); |
| 566 |
|
{error, timeout} -> |
| 567 |
|
%% For MySQL 4.x.x there is no way to recover from timeout except |
| 568 |
|
%% killing the connection itself. |
| 569 |
:-( |
exit(timeout); |
| 570 |
|
QueryResult -> |
| 571 |
196 |
QueryResult |
| 572 |
|
end, |
| 573 |
198 |
setopts(SockMod, Socket, [{active, once}]), |
| 574 |
198 |
State1 = lists:foldl(fun update_state/2, State, Recs), |
| 575 |
198 |
State1#state.warning_count > 0 andalso State1#state.log_warnings |
| 576 |
2 |
andalso log_warnings(State1, Stmt#prepared.orig_query), |
| 577 |
198 |
handle_query_call_result(Recs, Stmt#prepared.orig_query, State1). |
| 578 |
|
|
| 579 |
|
%% @doc Produces a tuple to return as an error reason. |
| 580 |
|
-spec error_to_reason(#error{}) -> mysql:server_reason(). |
| 581 |
|
error_to_reason(#error{code = Code, state = State, msg = Msg}) -> |
| 582 |
19 |
{Code, State, Msg}. |
| 583 |
|
|
| 584 |
|
%% @doc Updates a state with information from a response. Also re-schedules |
| 585 |
|
%% ping. |
| 586 |
|
-spec update_state(#ok{} | #eof{} | any(), #state{}) -> #state{}. |
| 587 |
|
update_state(Rec, State) -> |
| 588 |
838 |
State1 = case Rec of |
| 589 |
|
#ok{status = S, affected_rows = R, insert_id = Id, warning_count = W} -> |
| 590 |
440 |
State#state{status = S, affected_rows = R, insert_id = Id, |
| 591 |
|
warning_count = W}; |
| 592 |
|
#resultset{status = S, warning_count = W} -> |
| 593 |
281 |
State#state{status = S, warning_count = W}; |
| 594 |
|
#prepared{warning_count = W} -> |
| 595 |
101 |
State#state{warning_count = W}; |
| 596 |
|
_Other -> |
| 597 |
|
%% This includes errors. |
| 598 |
|
%% Reset some things. (Note: We don't reset status and insert_id.) |
| 599 |
16 |
State#state{warning_count = 0, affected_rows = 0} |
| 600 |
|
end, |
| 601 |
838 |
schedule_ping(State1). |
| 602 |
|
|
| 603 |
|
%% @doc executes an unparameterized query and returns {Reply, NewState}. |
| 604 |
|
query(Query, FilterMap, default_timeout, |
| 605 |
|
#state{query_timeout = DefaultTimeout} = State) -> |
| 606 |
457 |
query(Query, FilterMap, DefaultTimeout, State); |
| 607 |
|
query(Query, FilterMap, Timeout, State) -> |
| 608 |
|
#state{sockmod = SockMod, socket = Socket, |
| 609 |
459 |
allowed_local_paths = AllowedPaths} = State, |
| 610 |
459 |
setopts(SockMod, Socket, [{active, false}]), |
| 611 |
459 |
Result = mysql_protocol:query(Query, SockMod, Socket, AllowedPaths, |
| 612 |
|
FilterMap, Timeout), |
| 613 |
459 |
{ok, Recs} = case Result of |
| 614 |
|
{error, timeout} when State#state.server_version >= [5, 0, 0] -> |
| 615 |
1 |
kill_query(State), |
| 616 |
1 |
mysql_protocol:fetch_query_response(SockMod, Socket, |
| 617 |
|
[], FilterMap, |
| 618 |
|
?cmd_timeout); |
| 619 |
|
{error, timeout} -> |
| 620 |
|
%% For MySQL 4.x.x there is no way to recover from timeout except |
| 621 |
|
%% killing the connection itself. |
| 622 |
:-( |
exit(timeout); |
| 623 |
|
QueryResult -> |
| 624 |
458 |
QueryResult |
| 625 |
|
end, |
| 626 |
459 |
setopts(SockMod, Socket, [{active, once}]), |
| 627 |
459 |
State1 = lists:foldl(fun update_state/2, State, Recs), |
| 628 |
459 |
State1#state.warning_count > 0 andalso State1#state.log_warnings |
| 629 |
1 |
andalso log_warnings(State1, Query), |
| 630 |
459 |
handle_query_call_result(Recs, Query, State1). |
| 631 |
|
|
| 632 |
|
%% @doc Prepares a named query and returns {{ok, Name}, NewState} or |
| 633 |
|
%% {{error, Reason}, NewState}. |
| 634 |
|
named_prepare(Name, Query, State) -> |
| 635 |
12 |
#state{socket = Socket, sockmod = SockMod} = State, |
| 636 |
|
%% First unprepare if there is an old statement with this name. |
| 637 |
12 |
setopts(SockMod, Socket, [{active, false}]), |
| 638 |
12 |
State1 = case dict:find(Name, State#state.stmts) of |
| 639 |
|
{ok, OldStmt} -> |
| 640 |
1 |
mysql_protocol:unprepare(OldStmt, SockMod, Socket), |
| 641 |
1 |
State#state{stmts = dict:erase(Name, State#state.stmts)}; |
| 642 |
|
error -> |
| 643 |
11 |
State |
| 644 |
|
end, |
| 645 |
12 |
Rec = mysql_protocol:prepare(Query, SockMod, Socket), |
| 646 |
12 |
setopts(SockMod, Socket, [{active, once}]), |
| 647 |
12 |
State2 = update_state(Rec, State1), |
| 648 |
12 |
case Rec of |
| 649 |
|
#error{} = E -> |
| 650 |
3 |
{{error, error_to_reason(E)}, State2}; |
| 651 |
|
#prepared{} = Stmt -> |
| 652 |
9 |
Stmts1 = dict:store(Name, Stmt, State2#state.stmts), |
| 653 |
9 |
State3 = State2#state{stmts = Stmts1}, |
| 654 |
9 |
{{ok, Name}, State3} |
| 655 |
|
end. |
| 656 |
|
|
| 657 |
|
%% @doc Transforms result sets into a structure appropriate to be returned |
| 658 |
|
%% to the client. |
| 659 |
|
handle_query_call_result([_] = Recs, Query, State) -> |
| 660 |
647 |
handle_query_call_result(Recs, not_applicable, Query, State, []); |
| 661 |
|
handle_query_call_result(Recs, Query, State) -> |
| 662 |
10 |
handle_query_call_result(Recs, 1, Query, State, []). |
| 663 |
|
|
| 664 |
|
handle_query_call_result([], _RecNum, _Query, State, []) -> |
| 665 |
373 |
{ok, State}; |
| 666 |
|
handle_query_call_result([], _RecNum, _Query, State, [{ColumnNames, Rows}]) -> |
| 667 |
267 |
{{ok, ColumnNames, Rows}, State}; |
| 668 |
|
handle_query_call_result([], _RecNum, _Query, State, ResultSetsAcc) -> |
| 669 |
6 |
{{ok, lists:reverse(ResultSetsAcc)}, State}; |
| 670 |
|
handle_query_call_result([Rec|Recs], RecNum, Query, |
| 671 |
|
#state{transaction_levels = L} = State, |
| 672 |
|
ResultSetsAcc) -> |
| 673 |
671 |
RecNum1 = case RecNum of |
| 674 |
647 |
not_applicable -> not_applicable; |
| 675 |
24 |
_ -> RecNum + 1 |
| 676 |
|
end, |
| 677 |
671 |
case Rec of |
| 678 |
|
#ok{status = Status} when Status band ?SERVER_STATUS_IN_TRANS == 0, |
| 679 |
|
L /= [] -> |
| 680 |
|
%% DDL statements (e.g. CREATE TABLE, ALTER TABLE, etc.) result in |
| 681 |
|
%% an implicit commit. |
| 682 |
1 |
Length = length(L), |
| 683 |
1 |
Reply = {implicit_commit, Length, Query}, |
| 684 |
1 |
[] = demonitor_processes(L, Length), |
| 685 |
1 |
{Reply, State#state{transaction_levels = []}}; |
| 686 |
|
#ok{status = Status} -> |
| 687 |
379 |
maybe_log_slow_query(State, Status, RecNum, Query), |
| 688 |
379 |
handle_query_call_result(Recs, RecNum1, Query, State, ResultSetsAcc); |
| 689 |
|
#resultset{cols = ColDefs, rows = Rows, status = Status} -> |
| 690 |
281 |
Names = [Def#col.name || Def <- ColDefs], |
| 691 |
281 |
ResultSetsAcc1 = [{Names, Rows} | ResultSetsAcc], |
| 692 |
281 |
maybe_log_slow_query(State, Status, RecNum, Query), |
| 693 |
281 |
handle_query_call_result(Recs, RecNum1, Query, State, ResultSetsAcc1); |
| 694 |
|
#error{code = ?ERROR_DEADLOCK} when L /= [] -> |
| 695 |
|
%% These errors result in an implicit rollback. |
| 696 |
3 |
Reply = {implicit_rollback, length(L), error_to_reason(Rec)}, |
| 697 |
|
%% The transaction is rollbacked, except the BEGIN, so we're still |
| 698 |
|
%% in a transaction. (In 5.7+, also the BEGIN has been rolled back, |
| 699 |
|
%% but here we assume the old behaviour.) |
| 700 |
3 |
NewMonitors = demonitor_processes(L, length(L) - 1), |
| 701 |
3 |
{Reply, State#state{transaction_levels = NewMonitors}}; |
| 702 |
|
#error{} -> |
| 703 |
7 |
{{error, error_to_reason(Rec)}, State} |
| 704 |
|
end. |
| 705 |
|
|
| 706 |
|
%% @doc Schedules (or re-schedules) ping. |
| 707 |
|
schedule_ping(State = #state{ping_timeout = infinity}) -> |
| 708 |
239 |
State; |
| 709 |
|
schedule_ping(State = #state{ping_timeout = Timeout, ping_ref = Ref}) -> |
| 710 |
733 |
is_reference(Ref) andalso erlang:cancel_timer(Ref), |
| 711 |
733 |
State#state{ping_ref = erlang:send_after(Timeout, self(), ping)}. |
| 712 |
|
|
| 713 |
|
%% @doc Fetches and logs warnings. Query is the query that gave the warnings. |
| 714 |
|
log_warnings(#state{socket = Socket, sockmod = SockMod}, Query) -> |
| 715 |
3 |
setopts(SockMod, Socket, [{active, false}]), |
| 716 |
3 |
{ok, [#resultset{rows = Rows}]} = mysql_protocol:query(<<"SHOW WARNINGS">>, |
| 717 |
|
SockMod, Socket, |
| 718 |
|
[], no_filtermap_fun, |
| 719 |
|
?cmd_timeout), |
| 720 |
3 |
setopts(SockMod, Socket, [{active, once}]), |
| 721 |
3 |
Lines = [[Level, " ", integer_to_binary(Code), ": ", Message, "\n"] |
| 722 |
3 |
|| [Level, Code, Message] <- Rows], |
| 723 |
3 |
error_logger:warning_msg("~s in ~s~n", [Lines, Query]). |
| 724 |
|
|
| 725 |
|
%% @doc Logs slow queries. Query is the query that gave the warnings. |
| 726 |
|
maybe_log_slow_query(#state{log_slow_queries = true}, S, RecNum, Query) |
| 727 |
|
when S band ?SERVER_QUERY_WAS_SLOW /= 0 -> |
| 728 |
3 |
IndexHint = if |
| 729 |
|
S band ?SERVER_STATUS_NO_GOOD_INDEX_USED /= 0 -> |
| 730 |
:-( |
" (with no good index)"; |
| 731 |
|
S band ?SERVER_STATUS_NO_INDEX_USED /= 0 -> |
| 732 |
:-( |
" (with no index)"; |
| 733 |
|
true -> |
| 734 |
3 |
"" |
| 735 |
|
end, |
| 736 |
3 |
QueryNumHint = case RecNum of |
| 737 |
|
not_applicable -> |
| 738 |
1 |
""; |
| 739 |
|
_ -> |
| 740 |
2 |
io_lib:format(" #~b", [RecNum]) |
| 741 |
|
end, |
| 742 |
3 |
error_logger:warning_msg("MySQL query~s~s was slow: ~s~n", |
| 743 |
|
[QueryNumHint, IndexHint, Query]); |
| 744 |
|
maybe_log_slow_query(_, _, _, _) -> |
| 745 |
657 |
ok. |
| 746 |
|
|
| 747 |
|
%% @doc Makes a separate connection and execute KILL QUERY. We do this to get |
| 748 |
|
%% our main connection back to normal. KILL QUERY appeared in MySQL 5.0.0. |
| 749 |
|
kill_query(#state{connection_id = ConnId, host = Host, port = Port, |
| 750 |
|
user = User, password = Password, ssl_opts = SSLOpts, |
| 751 |
|
cap_found_rows = SetFoundRows}) -> |
| 752 |
|
%% Connect socket |
| 753 |
3 |
SockOpts = [{active, false}, binary, {packet, raw}], |
| 754 |
3 |
{ok, Socket0} = gen_tcp:connect(Host, Port, SockOpts), |
| 755 |
|
|
| 756 |
|
%% Exchange handshake communication. |
| 757 |
3 |
Result = mysql_protocol:handshake(Host, User, Password, undefined, gen_tcp, |
| 758 |
|
SSLOpts, Socket0, SetFoundRows), |
| 759 |
3 |
case Result of |
| 760 |
|
{ok, #handshake{}, SockMod, Socket} -> |
| 761 |
|
%% Kill and disconnect |
| 762 |
3 |
IdBin = integer_to_binary(ConnId), |
| 763 |
3 |
{ok, [#ok{}]} = mysql_protocol:query(<<"KILL QUERY ", IdBin/binary>>, |
| 764 |
|
SockMod, Socket, |
| 765 |
|
[], no_filtermap_fun, |
| 766 |
|
?cmd_timeout), |
| 767 |
3 |
mysql_protocol:quit(SockMod, Socket); |
| 768 |
|
#error{} = E -> |
| 769 |
:-( |
error_logger:error_msg("Failed to connect to kill query: ~p", |
| 770 |
|
[error_to_reason(E)]) |
| 771 |
|
end. |
| 772 |
|
|
| 773 |
|
stop_server(Reason, #state{socket = undefined} = State) -> |
| 774 |
:-( |
{stop, Reason, State}; |
| 775 |
|
stop_server(Reason, |
| 776 |
|
#state{socket = Socket, connection_id = ConnId} = State) -> |
| 777 |
5 |
error_logger:error_msg("Connection Id ~p closing with reason: ~p~n", |
| 778 |
|
[ConnId, Reason]), |
| 779 |
5 |
ok = gen_tcp:close(Socket), |
| 780 |
5 |
{stop, Reason, State#state{socket = undefined, connection_id = undefined}}. |
| 781 |
|
|
| 782 |
|
setopts(gen_tcp, Socket, Opts) -> |
| 783 |
1914 |
inet:setopts(Socket, Opts); |
| 784 |
|
setopts(SockMod, Socket, Opts) -> |
| 785 |
12 |
SockMod:setopts(Socket, Opts). |
| 786 |
|
|
| 787 |
|
demonitor_processes(List, 0) -> |
| 788 |
4 |
List; |
| 789 |
|
demonitor_processes([{_FromPid, MRef}|T], Count) -> |
| 790 |
5 |
erlang:demonitor(MRef), |
| 791 |
5 |
demonitor_processes(T, Count - 1). |