
1 %% MySQL/OTP – MySQL client library for Erlang/OTP
2 %% Copyright (C) 2014-2018 Viktor Söderqvist
3 %%
4 %% This file is part of MySQL/OTP.
5 %%
6 %% MySQL/OTP is free software: you can redistribute it and/or modify it under
7 %% the terms of the GNU Lesser General Public License as published by the Free
8 %% Software Foundation, either version 3 of the License, or (at your option)
9 %% any later version.
10 %%
11 %% This program is distributed in the hope that it will be useful, but WITHOUT
12 %% ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 %% FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 %% more details.
15 %%
16 %% You should have received a copy of the GNU Lesser General Public License
17 %% along with this program. If not, see <>.
19 %% @doc This module implements parts of the MySQL client/server protocol.
20 %%
21 %% The protocol is described in the document "MySQL Internals" which can be
22 %% found under "MySQL Documentation: Expert Guides" on
23 %%
24 %% TCP communication is not handled in this module. Most of the public functions
25 %% take funs for data communitaction as parameters.
26 %% @private
27 -module(mysql_conn).
29 -behaviour(gen_server).
30 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
31 code_change/3]).
33 -define(default_host, "localhost").
34 -define(default_port, 3306).
35 -define(default_user, <<>>).
36 -define(default_password, <<>>).
37 -define(default_connect_timeout, 5000).
38 -define(default_query_timeout, infinity).
39 -define(default_query_cache_time, 60000). %% for query/3.
40 -define(default_ping_timeout, 60000).
42 -define(cmd_timeout, 3000). %% Timeout used for various commands to the server
44 %% Errors that cause "implicit rollback"
45 -define(ERROR_DEADLOCK, 1213).
47 %% --- Gen_server callbacks ---
49 -include("records.hrl").
50 -include("server_status.hrl").
52 %% Gen_server state
53 -record(state, {server_version, connection_id, socket, sockmod, tcp_opts, ssl_opts,
54 host, port, user, password, database, queries, prepares,
55 auth_plugin_name, auth_plugin_data, allowed_local_paths,
56 log_warnings, log_slow_queries,
57 connect_timeout, ping_timeout, query_timeout, query_cache_time,
58 affected_rows = 0, status = 0, warning_count = 0, insert_id = 0,
59 transaction_levels = [], ping_ref = undefined,
60 stmts = dict:new(), query_cache = empty, cap_found_rows = false}).
62 %% @private
63 init(Opts) ->
64 %% Connect
65 40 Host = proplists:get_value(host, Opts, ?default_host),
67 40 DefaultPort = case Host of
68 1 {local, _LocalAddr} -> 0;
69 39 _NonLocalAddr -> ?default_port
70 end,
71 40 Port = proplists:get_value(port, Opts, DefaultPort),
73 40 User = proplists:get_value(user, Opts, ?default_user),
74 40 Password = proplists:get_value(password, Opts, ?default_password),
75 40 Database = proplists:get_value(database, Opts, undefined),
76 40 AllowedLocalPaths = proplists:get_value(allowed_local_paths, Opts, []),
77 40 LogWarn = proplists:get_value(log_warnings, Opts, true),
78 40 LogSlow = proplists:get_value(log_slow_queries, Opts, false),
79 40 KeepAlive = proplists:get_value(keep_alive, Opts, false),
80 40 ConnectTimeout = proplists:get_value(connect_timeout, Opts,
81 ?default_connect_timeout),
82 40 QueryTimeout = proplists:get_value(query_timeout, Opts,
83 ?default_query_timeout),
84 40 QueryCacheTime = proplists:get_value(query_cache_time, Opts,
85 ?default_query_cache_time),
86 40 TcpOpts = proplists:get_value(tcp_options, Opts, []),
87 40 SetFoundRows = proplists:get_value(found_rows, Opts, false),
88 40 SSLOpts = proplists:get_value(ssl, Opts, undefined),
90 40 Queries = proplists:get_value(queries, Opts, []),
91 40 Prepares = proplists:get_value(prepare, Opts, []),
93 40 true = lists:all(fun mysql_protocol:valid_path/1, AllowedLocalPaths),
95 40 PingTimeout = case KeepAlive of
96 4 true -> ?default_ping_timeout;
97 35 false -> infinity;
98 1 N when N > 0 -> N
99 end,
101 40 State0 = #state{
102 tcp_opts = TcpOpts,
103 ssl_opts = SSLOpts,
104 host = Host, port = Port,
105 user = User, password = Password,
106 database = Database,
107 allowed_local_paths = AllowedLocalPaths,
108 queries = Queries, prepares = Prepares,
109 log_warnings = LogWarn, log_slow_queries = LogSlow,
110 connect_timeout = ConnectTimeout,
111 ping_timeout = PingTimeout,
112 query_timeout = QueryTimeout,
113 query_cache_time = QueryCacheTime,
114 cap_found_rows = (SetFoundRows =:= true)
115 },
117 40 case proplists:get_value(connect_mode, Opts, synchronous) of
118 synchronous ->
119 37 case connect(State0) of
120 {ok, State1} ->
121 34 {ok, State1};
122 {error, Reason} ->
123 3 {stop, Reason}
124 end;
125 asynchronous ->
126 2 gen_server:cast(self(), connect),
127 2 {ok, State0};
128 lazy ->
129 1 {ok, State0}
130 end.
132 connect(#state{connect_timeout = ConnectTimeout} = State) ->
133 40 MainPid = self(),
134 40 Pid = spawn_link(
135 fun () ->
136 40 {ok, State1}=connect_socket(State),
137 40 case handshake(State1) of
138 {ok, #state{sockmod = SockMod, socket = Socket} = State2} ->
139 38 SockMod:controlling_process(Socket, MainPid),
140 38 MainPid ! {self(), {ok, State2}};
141 {error, _} = E ->
142 2 MainPid ! {self(), E}
143 end
144 end
145 ),
146 40 receive
147 {Pid, {ok, State3}} ->
148 38 post_connect(State3);
149 {Pid, {error, _} = E} ->
150 2 E
151 after ConnectTimeout ->
exit(Pid, kill),
{error, timeout}
155 end.
157 connect_socket(#state{tcp_opts = TcpOpts, host = Host, port = Port} = State) ->
158 %% Connect socket
159 40 SockOpts = sanitize_tcp_opts(TcpOpts),
160 40 {ok, Socket} = gen_tcp:connect(Host, Port, SockOpts),
162 %% If buffer wasn't specifically defined make it at least as
163 %% large as recbuf, as suggested by the inet:setopts() docs.
164 40 case proplists:is_defined(buffer, TcpOpts) of
165 true ->
167 false ->
168 40 {ok, [{buffer, Buffer}]} = inet:getopts(Socket, [buffer]),
169 40 {ok, [{recbuf, Recbuf}]} = inet:getopts(Socket, [recbuf]),
170 40 ok = inet:setopts(Socket, [{buffer, max(Buffer, Recbuf)}])
171 end,
173 40 {ok, State#state{socket = Socket}}.
175 sanitize_tcp_opts([{inet_backend, _} = InetBackend | TcpOpts0]) ->
176 %% This option is be used to turn on the experimental socket backend for
177 %% gen_tcp/inet (OTP/23). If given, it must remain the first option in the
178 %% list.
[InetBackend | sanitize_tcp_opts(TcpOpts0)];
180 sanitize_tcp_opts(TcpOpts0) ->
181 40 TcpOpts1 = lists:filter(
182 fun
({mode, _}) -> false;
(binary) -> false;
(list) -> false;
({packet, _}) -> false;
({active, _}) -> false;
(_) -> true
189 end,
190 TcpOpts0
191 ),
192 40 TcpOpts2 = case lists:keymember(nodelay, 1, TcpOpts1) of
true -> TcpOpts1;
194 40 false -> [{nodelay, true} | TcpOpts1]
195 end,
196 40 [binary, {packet, raw}, {active, false} | TcpOpts2].
198 handshake(#state{socket = Socket0, ssl_opts = SSLOpts,
199 host = Host, user = User, password = Password, database = Database,
200 cap_found_rows = SetFoundRows} = State0) ->
201 %% Exchange handshake communication.
202 40 Result = mysql_protocol:handshake(Host, User, Password, Database, gen_tcp,
203 SSLOpts, Socket0, SetFoundRows),
204 40 case Result of
205 {ok, Handshake, SockMod, Socket} ->
206 38 setopts(SockMod, Socket, [{active, once}]),
207 #handshake{server_version = Version, connection_id = ConnId,
208 status = Status,
209 auth_plugin_name = AuthPluginName,
210 38 auth_plugin_data = AuthPluginData} = Handshake,
211 38 State1 = State0#state{server_version = Version, connection_id = ConnId,
212 sockmod = SockMod,
213 socket = Socket,
214 auth_plugin_name = AuthPluginName,
215 auth_plugin_data = AuthPluginData,
216 status = Status},
217 38 {ok, State1};
218 #error{} = E ->
219 2 {error, error_to_reason(E)}
220 end.
222 post_connect(#state{queries = Queries, prepares = Prepares} = State) ->
223 46 case execute_on_connect(Queries, Prepares, State) of
224 {ok, State1} ->
225 42 process_flag(trap_exit, true),
226 42 State2 = schedule_ping(State1),
227 42 {ok, State2};
228 {error, _} = E ->
229 4 E
230 end.
232 execute_on_connect([], [], State) ->
233 42 {ok, State};
234 execute_on_connect([], [{Name, Stmt}|Prepares], State) ->
235 5 case named_prepare(Name, Stmt, State) of
236 {{ok, Name}, State1} ->
237 3 execute_on_connect([], Prepares, State1);
238 {{error, _} = E, _} ->
239 2 E
240 end;
241 execute_on_connect([Query|Queries], Prepares, State) ->
242 9 case query(Query, no_filtermap_fun, default_timeout, State) of
243 {ok, State1} ->
244 3 execute_on_connect(Queries, Prepares, State1);
245 {{ok, _}, State1} ->
246 2 execute_on_connect(Queries, Prepares, State1);
247 {{ok, _, _}, State1} ->
248 2 execute_on_connect(Queries, Prepares, State1);
249 {{error, _} = E, _} ->
250 2 E
251 end.
253 %% @private
254 %% @doc
255 %%
256 %% Query and execute calls:
257 %%
258 %% <ul>
259 %% <li>{query, Query, FilterMap, Timeout}</li>
260 %% <li>{param_query, Query, Params, FilterMap, Timeout}</li>
261 %% <li>{execute, Stmt, Args, FilterMap, Timeout}</li>
262 %% </ul>
263 %%
264 %% For the calls listed above, we return these values:
265 %%
266 %% <dl>
267 %% <dt>`ok'</dt>
268 %% <dd>Success without returning any table data (UPDATE, etc.)</dd>
269 %% <dt>`{ok, ColumnNames, Rows}'</dt>
270 %% <dd>Queries returning one result set of table data</dd>
271 %% <dt>`{ok, [{ColumnNames, Rows}, ...]}'</dt>
272 %% <dd>Queries returning more than one result set of table data</dd>
273 %% <dt>`{error, ServerReason}'</dt>
274 %% <dd>MySQL server error</dd>
275 %% <dt>`{implicit_commit, NestingLevel, Query}'</dt>
276 %% <dd>A DDL statement (e.g. CREATE TABLE, ALTER TABLE, etc.) results in
277 %% an implicit commit.
278 %%
279 %% If the caller is in a (nested) transaction, it must be aborted. To be
280 %% able to handle this in the caller's process, we also return the
281 %% nesting level.</dd>
282 %% <dt>`{implicit_rollback, NestingLevel, ServerReason}'</dt>
283 %% <dd>This errors results in an implicit rollback: `{1213, <<"40001">>,
284 %% <<"Deadlock found when trying to get lock; try restarting "
285 %% "transaction">>}'.
286 %%
287 %% If the caller is in a (nested) transaction, it must be aborted. To be
288 %% able to handle this in the caller's process, we also return the
289 %% nesting level.</dd>
290 %% </dl>
291 handle_call(is_connected, _, #state{socket = Socket} = State) ->
292 4 {reply, Socket =/= undefined, State};
293 handle_call(Msg, From, #state{socket = undefined} = State) ->
294 1 case connect(State) of
295 {ok, State1} ->
296 1 handle_call(Msg, From, State1);
297 {error, _} = E ->
{stop, E, State}
299 end;
300 handle_call({query, Query, FilterMap, Timeout}, _From, State) ->
301 450 {Reply, State1} = query(Query, FilterMap, Timeout, State),
302 450 {reply, Reply, State1};
303 handle_call({param_query, Query, Params, FilterMap, default_timeout}, From,
304 State) ->
305 12 handle_call({param_query, Query, Params, FilterMap,
306 State#state.query_timeout}, From, State);
307 handle_call({param_query, Query, Params, FilterMap, Timeout}, _From,
308 #state{socket = Socket, sockmod = SockMod} = State) ->
309 %% Parametrized query: Prepared statement cached with the query as the key
310 13 QueryBin = iolist_to_binary(Query),
311 13 Cache = State#state.query_cache,
312 13 {StmtResult, Cache1} = case mysql_cache:lookup(QueryBin, Cache) of
313 {found, FoundStmt, NewCache} ->
314 %% Found
315 2 {{ok, FoundStmt}, NewCache};
316 not_found ->
317 %% Prepare
318 11 setopts(SockMod, Socket, [{active, false}]),
319 11 Rec = mysql_protocol:prepare(Query, SockMod, Socket),
320 11 setopts(SockMod, Socket, [{active, once}]),
321 11 case Rec of
322 #error{} = E ->
323 1 {{error, error_to_reason(E)}, Cache};
324 #prepared{} = Stmt ->
325 %% If the first entry in the cache, start the timer.
326 10 Cache == empty andalso begin
327 6 When = State#state.query_cache_time * 2,
328 6 erlang:send_after(When, self(), query_cache)
329 end,
330 10 {{ok, Stmt}, mysql_cache:store(QueryBin, Stmt, Cache)}
331 end
332 end,
333 13 case StmtResult of
334 {ok, StmtRec} ->
335 12 State1 = State#state{query_cache = Cache1},
336 12 {Reply, State2} = execute_stmt(StmtRec, Params, FilterMap, Timeout, State1),
337 12 {reply, Reply, State2};
338 PrepareError ->
339 1 {reply, PrepareError, State}
340 end;
341 handle_call({execute, Stmt, Args, FilterMap, default_timeout}, From, State) ->
342 186 handle_call({execute, Stmt, Args, FilterMap, State#state.query_timeout},
343 From, State);
344 handle_call({execute, Stmt, Args, FilterMap, Timeout}, _From, State) ->
345 187 case dict:find(Stmt, State#state.stmts) of
346 {ok, StmtRec} ->
347 186 {Reply, State1} = execute_stmt(StmtRec, Args, FilterMap, Timeout, State),
348 186 {reply, Reply, State1};
349 error ->
350 1 {reply, {error, not_prepared}, State}
351 end;
352 handle_call({prepare, Query}, _From, State) ->
353 93 #state{socket = Socket, sockmod = SockMod} = State,
354 93 setopts(SockMod, Socket, [{active, false}]),
355 93 Rec = mysql_protocol:prepare(Query, SockMod, Socket),
356 93 setopts(SockMod, Socket, [{active, once}]),
357 93 State1 = update_state(Rec, State),
358 93 case Rec of
359 #error{} = E ->
360 1 {reply, {error, error_to_reason(E)}, State1};
361 #prepared{statement_id = Id} = Stmt ->
362 92 Stmts1 = dict:store(Id, Stmt, State1#state.stmts),
363 92 State2 = State#state{stmts = Stmts1},
364 92 {reply, {ok, Id}, State2}
365 end;
366 handle_call({prepare, Name, Query}, _From, State) when is_atom(Name) ->
367 7 {Reply, State1} = named_prepare(Name, Query, State),
368 7 {reply, Reply, State1};
369 handle_call({unprepare, Stmt}, _From, State) when is_atom(Stmt);
370 is_integer(Stmt) ->
371 91 case dict:find(Stmt, State#state.stmts) of
372 {ok, StmtRec} ->
373 89 #state{socket = Socket, sockmod = SockMod} = State,
374 89 setopts(SockMod, Socket, [{active, false}]),
375 89 mysql_protocol:unprepare(StmtRec, SockMod, Socket),
376 89 setopts(SockMod, Socket, [{active, once}]),
377 89 State1 = State#state{stmts = dict:erase(Stmt, State#state.stmts)},
378 89 State2 = schedule_ping(State1),
379 89 {reply, ok, State2};
380 error ->
381 2 {reply, {error, not_prepared}, State}
382 end;
383 handle_call({change_user, Username, Password, Options}, From,
384 State = #state{transaction_levels = []}) ->
385 #state{socket = Socket, sockmod = SockMod,
386 auth_plugin_name = AuthPluginName,
387 auth_plugin_data = AuthPluginData,
388 9 server_version = ServerVersion} = State,
389 9 Database = proplists:get_value(database, Options, undefined),
390 9 Queries = proplists:get_value(queries, Options, []),
391 9 Prepares = proplists:get_value(prepare, Options, []),
392 9 setopts(SockMod, Socket, [{active, false}]),
393 9 Result = mysql_protocol:change_user(SockMod, Socket, Username, Password,
394 AuthPluginName, AuthPluginData, Database,
395 ServerVersion),
396 9 setopts(SockMod, Socket, [{active, once}]),
397 9 State1 = update_state(Result, State),
398 9 State1#state.warning_count > 0 andalso State1#state.log_warnings
andalso log_warnings(State1, "CHANGE USER"),
400 9 State2 = State1#state{query_cache = empty, stmts = dict:new()},
401 9 case Result of
402 #ok{} ->
403 8 State3 = State2#state{user = Username, password = Password,
404 database=Database, queries=Queries,
405 prepares=Prepares},
406 8 case post_connect(State3) of
407 {ok, State4} ->
408 6 {reply, ok, State4};
409 {error, Reason} = E ->
410 2 gen_server:reply(From, E),
411 2 stop_server(Reason, State3)
412 end;
413 #error{} = E ->
414 1 gen_server:reply(From, {error, error_to_reason(E)}),
415 1 stop_server(change_user_failed, State2)
416 end;
417 handle_call(reset_connection, _From, #state{socket = Socket, sockmod = SockMod} = State) ->
418 1 setopts(SockMod, Socket, [{active, false}]),
419 1 Result = mysql_protocol:reset_connnection(SockMod, Socket),
420 1 setopts(SockMod, Socket, [{active, once}]),
421 1 State1 = update_state(Result, State),
422 1 Reply = case Result of
#ok{} -> ok;
424 #error{} = E ->
425 %% 'COM_RESET_CONNECTION' is added in MySQL 5.7 and MariaDB 10
426 %% "Unkown command" is returned when MySQL =< 5.6 or MariaDB =< 5.5
427 1 {error, error_to_reason(E)}
428 end,
429 1 {reply, Reply, State1};
431 handle_call(warning_count, _From, State) ->
432 2 {reply, State#state.warning_count, State};
433 handle_call(insert_id, _From, State) ->
434 3 {reply, State#state.insert_id, State};
435 handle_call(affected_rows, _From, State) ->
436 3 {reply, State#state.affected_rows, State};
437 handle_call(autocommit, _From, State) ->
438 3 {reply, State#state.status band ?SERVER_STATUS_AUTOCOMMIT /= 0, State};
439 handle_call(backslash_escapes_enabled, _From, State = #state{status = S}) ->
440 2 {reply, S band ?SERVER_STATUS_NO_BACKSLASH_ESCAPES == 0, State};
441 handle_call(in_transaction, _From, State) ->
442 25 {reply, State#state.status band ?SERVER_STATUS_IN_TRANS /= 0, State};
443 handle_call(start_transaction, {FromPid, _},
444 State = #state{socket = Socket, sockmod = SockMod,
445 transaction_levels = L, status = Status})
446 when Status band ?SERVER_STATUS_IN_TRANS == 0, L == [];
447 Status band ?SERVER_STATUS_IN_TRANS /= 0, L /= [] ->
448 28 MRef = erlang:monitor(process, FromPid),
449 28 Query = case L of
450 17 [] -> <<"BEGIN">>;
451 11 _ -> <<"SAVEPOINT s", (integer_to_binary(length(L)))/binary>>
452 end,
453 28 setopts(SockMod, Socket, [{active, false}]),
454 28 {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
455 [], no_filtermap_fun,
456 ?cmd_timeout),
457 28 setopts(SockMod, Socket, [{active, once}]),
458 28 State1 = update_state(Res, State),
459 28 {reply, ok, State1#state{transaction_levels = [{FromPid, MRef} | L]}};
460 handle_call(rollback, {FromPid, _},
461 State = #state{socket = Socket, sockmod = SockMod, status = Status,
462 transaction_levels = [{FromPid, MRef} | L]})
463 when Status band ?SERVER_STATUS_IN_TRANS /= 0 ->
464 6 erlang:demonitor(MRef),
465 6 Query = case L of
466 5 [] -> <<"ROLLBACK">>;
467 1 _ -> <<"ROLLBACK TO s", (integer_to_binary(length(L)))/binary>>
468 end,
469 6 setopts(SockMod, Socket, [{active, false}]),
470 6 {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
471 [], no_filtermap_fun,
472 ?cmd_timeout),
473 6 setopts(SockMod, Socket, [{active, once}]),
474 6 State1 = update_state(Res, State),
475 6 {reply, ok, State1#state{transaction_levels = L}};
476 handle_call(commit, {FromPid, _},
477 State = #state{socket = Socket, sockmod = SockMod, status = Status,
478 transaction_levels = [{FromPid, MRef} | L]})
479 when Status band ?SERVER_STATUS_IN_TRANS /= 0 ->
480 16 erlang:demonitor(MRef),
481 16 Query = case L of
482 10 [] -> <<"COMMIT">>;
483 6 _ -> <<"RELEASE SAVEPOINT s", (integer_to_binary(length(L)))/binary>>
484 end,
485 16 setopts(SockMod, Socket, [{active, false}]),
486 16 {ok, [Res = #ok{}]} = mysql_protocol:query(Query, SockMod, Socket,
487 [], no_filtermap_fun,
488 ?cmd_timeout),
489 16 setopts(SockMod, Socket, [{active, once}]),
490 16 State1 = update_state(Res, State),
491 16 {reply, ok, State1#state{transaction_levels = L}}.
493 %% @private
494 handle_cast(connect, #state{socket = undefined} = State) ->
495 2 case connect(State) of
496 {ok, State1} ->
497 1 {noreply, State1};
498 {error, _} = E ->
499 1 {stop, E, State}
500 end;
501 handle_cast(connect, State) ->
{noreply, State};
503 handle_cast(_Msg, State) ->
504 1 {noreply, State}.
506 %% @private
507 handle_info(query_cache, #state{query_cache = Cache,
508 query_cache_time = CacheTime} = State) ->
509 %% Evict expired queries/statements in the cache used by query/3.
510 1 {Evicted, Cache1} = mysql_cache:evict_older_than(Cache, CacheTime),
511 %% Unprepare the evicted statements
512 1 #state{socket = Socket, sockmod = SockMod} = State,
513 1 setopts(SockMod, Socket, [{active, false}]),
514 1 lists:foreach(fun ({_Query, Stmt}) ->
515 1 mysql_protocol:unprepare(Stmt, SockMod, Socket)
516 end,
517 Evicted),
518 1 setopts(SockMod, Socket, [{active, once}]),
519 %% If nonempty, schedule eviction again.
520 1 mysql_cache:size(Cache1) > 0 andalso
erlang:send_after(CacheTime, self(), query_cache),
522 1 {noreply, State#state{query_cache = Cache1}};
523 handle_info({'DOWN', _MRef, _, Pid, _Info}, State) ->
524 1 stop_server({application_process_died, Pid}, State);
525 handle_info(ping, #state{socket = Socket, sockmod = SockMod} = State) ->
526 4 setopts(SockMod, Socket, [{active, false}]),
527 4 #ok{} = mysql_protocol:ping(SockMod, Socket),
528 3 setopts(SockMod, Socket, [{active, once}]),
529 3 {noreply, schedule_ping(State)};
530 handle_info({tcp_closed, _Socket}, State) ->
531 1 {stop, normal, State#state{socket = undefined, connection_id = undefined}};
532 handle_info({tcp_error, _Socket, Reason}, State) ->
533 1 stop_server({tcp_error, Reason}, State);
534 handle_info(_Info, State) ->
535 2 {noreply, State}.
537 %% @private
538 terminate(Reason, #state{socket = Socket, sockmod = SockMod})
539 when Socket =/= undefined andalso (Reason == normal orelse Reason == shutdown) ->
540 %% Send the goodbye message for politeness.
541 29 setopts(SockMod, Socket, [{active, false}]),
542 29 mysql_protocol:quit(SockMod, Socket);
543 terminate(_Reason, _State) ->
544 9 ok.
546 %% @private
547 code_change(_OldVsn, State = #state{}, _Extra) ->
548 1 {ok, State};
549 code_change(_OldVsn, _State, _Extra) ->
550 1 {error, incompatible_state}.
552 %% --- Helpers ---
554 %% @doc Executes a prepared statement and returns {Reply, NewState}.
555 execute_stmt(Stmt, Args, FilterMap, Timeout, State) ->
556 #state{socket = Socket, sockmod = SockMod,
557 198 allowed_local_paths = AllowedPaths} = State,
558 198 setopts(SockMod, Socket, [{active, false}]),
559 198 {ok, Recs} = case mysql_protocol:execute(Stmt, Args, SockMod, Socket,
560 AllowedPaths, FilterMap,
561 Timeout) of
562 {error, timeout} when State#state.server_version >= [5, 0, 0] ->
563 2 kill_query(State),
564 2 mysql_protocol:fetch_execute_response(SockMod, Socket,
565 [], FilterMap, ?cmd_timeout);
566 {error, timeout} ->
567 %% For MySQL 4.x.x there is no way to recover from timeout except
568 %% killing the connection itself.
570 QueryResult ->
571 196 QueryResult
572 end,
573 198 setopts(SockMod, Socket, [{active, once}]),
574 198 State1 = lists:foldl(fun update_state/2, State, Recs),
575 198 State1#state.warning_count > 0 andalso State1#state.log_warnings
576 2 andalso log_warnings(State1, Stmt#prepared.orig_query),
577 198 handle_query_call_result(Recs, Stmt#prepared.orig_query, State1).
579 %% @doc Produces a tuple to return as an error reason.
580 -spec error_to_reason(#error{}) -> mysql:server_reason().
581 error_to_reason(#error{code = Code, state = State, msg = Msg}) ->
582 19 {Code, State, Msg}.
584 %% @doc Updates a state with information from a response. Also re-schedules
585 %% ping.
586 -spec update_state(#ok{} | #eof{} | any(), #state{}) -> #state{}.
587 update_state(Rec, State) ->
588 838 State1 = case Rec of
589 #ok{status = S, affected_rows = R, insert_id = Id, warning_count = W} ->
590 440 State#state{status = S, affected_rows = R, insert_id = Id,
591 warning_count = W};
592 #resultset{status = S, warning_count = W} ->
593 281 State#state{status = S, warning_count = W};
594 #prepared{warning_count = W} ->
595 101 State#state{warning_count = W};
596 _Other ->
597 %% This includes errors.
598 %% Reset some things. (Note: We don't reset status and insert_id.)
599 16 State#state{warning_count = 0, affected_rows = 0}
600 end,
601 838 schedule_ping(State1).
603 %% @doc executes an unparameterized query and returns {Reply, NewState}.
604 query(Query, FilterMap, default_timeout,
605 #state{query_timeout = DefaultTimeout} = State) ->
606 457 query(Query, FilterMap, DefaultTimeout, State);
607 query(Query, FilterMap, Timeout, State) ->
608 #state{sockmod = SockMod, socket = Socket,
609 459 allowed_local_paths = AllowedPaths} = State,
610 459 setopts(SockMod, Socket, [{active, false}]),
611 459 Result = mysql_protocol:query(Query, SockMod, Socket, AllowedPaths,
612 FilterMap, Timeout),
613 459 {ok, Recs} = case Result of
614 {error, timeout} when State#state.server_version >= [5, 0, 0] ->
615 1 kill_query(State),
616 1 mysql_protocol:fetch_query_response(SockMod, Socket,
617 [], FilterMap,
618 ?cmd_timeout);
619 {error, timeout} ->
620 %% For MySQL 4.x.x there is no way to recover from timeout except
621 %% killing the connection itself.
623 QueryResult ->
624 458 QueryResult
625 end,
626 459 setopts(SockMod, Socket, [{active, once}]),
627 459 State1 = lists:foldl(fun update_state/2, State, Recs),
628 459 State1#state.warning_count > 0 andalso State1#state.log_warnings
629 1 andalso log_warnings(State1, Query),
630 459 handle_query_call_result(Recs, Query, State1).
632 %% @doc Prepares a named query and returns {{ok, Name}, NewState} or
633 %% {{error, Reason}, NewState}.
634 named_prepare(Name, Query, State) ->
635 12 #state{socket = Socket, sockmod = SockMod} = State,
636 %% First unprepare if there is an old statement with this name.
637 12 setopts(SockMod, Socket, [{active, false}]),
638 12 State1 = case dict:find(Name, State#state.stmts) of
639 {ok, OldStmt} ->
640 1 mysql_protocol:unprepare(OldStmt, SockMod, Socket),
641 1 State#state{stmts = dict:erase(Name, State#state.stmts)};
642 error ->
643 11 State
644 end,
645 12 Rec = mysql_protocol:prepare(Query, SockMod, Socket),
646 12 setopts(SockMod, Socket, [{active, once}]),
647 12 State2 = update_state(Rec, State1),
648 12 case Rec of
649 #error{} = E ->
650 3 {{error, error_to_reason(E)}, State2};
651 #prepared{} = Stmt ->
652 9 Stmts1 = dict:store(Name, Stmt, State2#state.stmts),
653 9 State3 = State2#state{stmts = Stmts1},
654 9 {{ok, Name}, State3}
655 end.
657 %% @doc Transforms result sets into a structure appropriate to be returned
658 %% to the client.
659 handle_query_call_result([_] = Recs, Query, State) ->
660 647 handle_query_call_result(Recs, not_applicable, Query, State, []);
661 handle_query_call_result(Recs, Query, State) ->
662 10 handle_query_call_result(Recs, 1, Query, State, []).
664 handle_query_call_result([], _RecNum, _Query, State, []) ->
665 373 {ok, State};
666 handle_query_call_result([], _RecNum, _Query, State, [{ColumnNames, Rows}]) ->
667 267 {{ok, ColumnNames, Rows}, State};
668 handle_query_call_result([], _RecNum, _Query, State, ResultSetsAcc) ->
669 6 {{ok, lists:reverse(ResultSetsAcc)}, State};
670 handle_query_call_result([Rec|Recs], RecNum, Query,
671 #state{transaction_levels = L} = State,
672 ResultSetsAcc) ->
673 671 RecNum1 = case RecNum of
674 647 not_applicable -> not_applicable;
675 24 _ -> RecNum + 1
676 end,
677 671 case Rec of
678 #ok{status = Status} when Status band ?SERVER_STATUS_IN_TRANS == 0,
679 L /= [] ->
680 %% DDL statements (e.g. CREATE TABLE, ALTER TABLE, etc.) result in
681 %% an implicit commit.
682 1 Length = length(L),
683 1 Reply = {implicit_commit, Length, Query},
684 1 [] = demonitor_processes(L, Length),
685 1 {Reply, State#state{transaction_levels = []}};
686 #ok{status = Status} ->
687 379 maybe_log_slow_query(State, Status, RecNum, Query),
688 379 handle_query_call_result(Recs, RecNum1, Query, State, ResultSetsAcc);
689 #resultset{cols = ColDefs, rows = Rows, status = Status} ->
690 281 Names = [ || Def <- ColDefs],
691 281 ResultSetsAcc1 = [{Names, Rows} | ResultSetsAcc],
692 281 maybe_log_slow_query(State, Status, RecNum, Query),
693 281 handle_query_call_result(Recs, RecNum1, Query, State, ResultSetsAcc1);
694 #error{code = ?ERROR_DEADLOCK} when L /= [] ->
695 %% These errors result in an implicit rollback.
696 3 Reply = {implicit_rollback, length(L), error_to_reason(Rec)},
697 %% The transaction is rollbacked, except the BEGIN, so we're still
698 %% in a transaction. (In 5.7+, also the BEGIN has been rolled back,
699 %% but here we assume the old behaviour.)
700 3 NewMonitors = demonitor_processes(L, length(L) - 1),
701 3 {Reply, State#state{transaction_levels = NewMonitors}};
702 #error{} ->
703 7 {{error, error_to_reason(Rec)}, State}
704 end.
706 %% @doc Schedules (or re-schedules) ping.
707 schedule_ping(State = #state{ping_timeout = infinity}) ->
708 239 State;
709 schedule_ping(State = #state{ping_timeout = Timeout, ping_ref = Ref}) ->
710 733 is_reference(Ref) andalso erlang:cancel_timer(Ref),
711 733 State#state{ping_ref = erlang:send_after(Timeout, self(), ping)}.
713 %% @doc Fetches and logs warnings. Query is the query that gave the warnings.
714 log_warnings(#state{socket = Socket, sockmod = SockMod}, Query) ->
715 3 setopts(SockMod, Socket, [{active, false}]),
716 3 {ok, [#resultset{rows = Rows}]} = mysql_protocol:query(<<"SHOW WARNINGS">>,
717 SockMod, Socket,
718 [], no_filtermap_fun,
719 ?cmd_timeout),
720 3 setopts(SockMod, Socket, [{active, once}]),
721 3 Lines = [[Level, " ", integer_to_binary(Code), ": ", Message, "\n"]
722 3 || [Level, Code, Message] <- Rows],
723 3 error_logger:warning_msg("~s in ~s~n", [Lines, Query]).
725 %% @doc Logs slow queries. Query is the query that gave the warnings.
726 maybe_log_slow_query(#state{log_slow_queries = true}, S, RecNum, Query)
727 when S band ?SERVER_QUERY_WAS_SLOW /= 0 ->
728 3 IndexHint = if
" (with no good index)";
" (with no index)";
733 true ->
734 3 ""
735 end,
736 3 QueryNumHint = case RecNum of
737 not_applicable ->
738 1 "";
739 _ ->
740 2 io_lib:format(" #~b", [RecNum])
741 end,
742 3 error_logger:warning_msg("MySQL query~s~s was slow: ~s~n",
743 [QueryNumHint, IndexHint, Query]);
744 maybe_log_slow_query(_, _, _, _) ->
745 657 ok.
747 %% @doc Makes a separate connection and execute KILL QUERY. We do this to get
748 %% our main connection back to normal. KILL QUERY appeared in MySQL 5.0.0.
749 kill_query(#state{connection_id = ConnId, host = Host, port = Port,
750 user = User, password = Password, ssl_opts = SSLOpts,
751 cap_found_rows = SetFoundRows}) ->
752 %% Connect socket
753 3 SockOpts = [{active, false}, binary, {packet, raw}],
754 3 {ok, Socket0} = gen_tcp:connect(Host, Port, SockOpts),
756 %% Exchange handshake communication.
757 3 Result = mysql_protocol:handshake(Host, User, Password, undefined, gen_tcp,
758 SSLOpts, Socket0, SetFoundRows),
759 3 case Result of
760 {ok, #handshake{}, SockMod, Socket} ->
761 %% Kill and disconnect
762 3 IdBin = integer_to_binary(ConnId),
763 3 {ok, [#ok{}]} = mysql_protocol:query(<<"KILL QUERY ", IdBin/binary>>,
764 SockMod, Socket,
765 [], no_filtermap_fun,
766 ?cmd_timeout),
767 3 mysql_protocol:quit(SockMod, Socket);
768 #error{} = E ->
error_logger:error_msg("Failed to connect to kill query: ~p",
770 [error_to_reason(E)])
771 end.
773 stop_server(Reason, #state{socket = undefined} = State) ->
{stop, Reason, State};
775 stop_server(Reason,
776 #state{socket = Socket, connection_id = ConnId} = State) ->
777 5 error_logger:error_msg("Connection Id ~p closing with reason: ~p~n",
778 [ConnId, Reason]),
779 5 ok = gen_tcp:close(Socket),
780 5 {stop, Reason, State#state{socket = undefined, connection_id = undefined}}.
782 setopts(gen_tcp, Socket, Opts) ->
783 1914 inet:setopts(Socket, Opts);
784 setopts(SockMod, Socket, Opts) ->
785 12 SockMod:setopts(Socket, Opts).
787 demonitor_processes(List, 0) ->
788 4 List;
789 demonitor_processes([{_FromPid, MRef}|T], Count) ->
790 5 erlang:demonitor(MRef),
791 5 demonitor_processes(T, Count - 1).
Line Hits Source