diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index ff148b2714..b585c7cc38 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -724,6 +724,13 @@ partitioned||* = true ; *.example.com:443:[2001:db8::1]:443 ;connect_to = +; Compression settings for replication +;compress_requests = true +;compress_min_size = 1024 +;compression_algorithm = gzip +;accept_encodings = gzip, deflate + + ; Some socket options that might boost performance in some scenarios: ; {nodelay, boolean()} ; {sndbuf, integer()} diff --git a/src/couch_replicator/COMPRESSION_CONFIG.md b/src/couch_replicator/COMPRESSION_CONFIG.md new file mode 100644 index 0000000000..0dfd05b886 --- /dev/null +++ b/src/couch_replicator/COMPRESSION_CONFIG.md @@ -0,0 +1,32 @@ +# CouchDB Replicator Compression + +## Overview + +The replicator now supports configurable HTTP compression to reduce bandwidth during replication. + +## Configuration + +```ini +[replicator] +; Enable compression (default: true) +compress_requests = true + +; Minimum body size to compress in bytes (default: 1024) +compress_min_size = 1024 + +; Algorithm: gzip (default), deflate +compression_algorithm = gzip + +; Accept these encodings in responses +accept_encodings = gzip, deflate +``` + +## Algorithms + +- **gzip** (default): Best compatibility, built-in to Erlang +- **deflate**: Built-in to Erlang, slightly faster than gzip + +## Statistics + +- `couch_replicator.requests.compressed` - Total compressed requests +- `couch_replicator.requests.compressed.{algorithm}` - Per-algorithm stats \ No newline at end of file diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl index 7f4f43afd5..abd954aeb2 100644 --- a/src/couch_replicator/src/couch_replicator_httpc.erl +++ b/src/couch_replicator/src/couch_replicator_httpc.erl @@ -21,6 +21,7 @@ -export([stop_http_worker/0]). -export([full_url/2]). + -import(couch_util, [ get_value/2, get_value/3 @@ -40,6 +41,80 @@ % where we may end up processing an unbounded number of messages. -define(MAX_DISCARDED_MESSAGES, 100). +should_compress_request(Body) when is_binary(Body) -> + MinSize = config:get_integer("replicator", "compress_min_size", 1024), + byte_size(Body) >= MinSize; +should_compress_request(Body) when is_list(Body) -> + should_compress_request(iolist_to_binary(Body)); +should_compress_request(_) -> + false. + +get_compression_algorithm() -> + % Supported: gzip (default), deflate + Algorithm = config:get("replicator", "compression_algorithm", "gzip"), + case Algorithm of + "gzip" -> gzip; + "deflate" -> deflate; + _ -> + couch_log:warning( + "couch_replicator_httpc: Unknown compression algorithm ~p, using gzip", + [Algorithm] + ), + gzip + end. + +compress_body(Body) when is_binary(Body) -> + compress_body_with_algorithm(Body, get_compression_algorithm()); +compress_body(Body) when is_list(Body) -> + compress_body(iolist_to_binary(Body)); +compress_body(Body) -> + Body. + +compress_body_with_algorithm(Body, gzip) -> + zlib:gzip(Body); +compress_body_with_algorithm(Body, deflate) -> + zlib:compress(Body). + +get_content_encoding(gzip) -> "gzip"; +get_content_encoding(deflate) -> "deflate". + +decompress_body(Headers, Body) -> + case lists:keyfind("Content-Encoding", 1, Headers) of + {"Content-Encoding", Encoding} -> + decompress_body_with_encoding(Encoding, Body); + _ -> + Body + end. + +decompress_body_with_encoding("gzip", Body) -> + try + zlib:gunzip(Body) + catch + error:data_error -> + couch_log:warning( + "couch_replicator_httpc: Failed to decompress gzip response, using original", + [] + ), + Body + end; +decompress_body_with_encoding("deflate", Body) -> + try + zlib:uncompress(Body) + catch + error:data_error -> + couch_log:warning( + "couch_replicator_httpc: Failed to decompress deflate response, using original", + [] + ), + Body + end; +decompress_body_with_encoding(Other, Body) -> + couch_log:warning( + "couch_replicator_httpc: Unknown content encoding ~p, using original body", + [Other] + ), + Body. + setup(Db) -> #httpdb{ httpc_pool = nil, @@ -108,11 +183,35 @@ stop_http_worker() -> send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb0, Params) -> Method = get_value(method, Params, get), - UserHeaders = get_value(headers, Params, []), - Headers1 = merge_headers(BaseHeaders, UserHeaders), + UserHeaders0 = get_value(headers, Params, []), + % Accept multiple compression algorithms + AcceptEncodings = config:get("replicator", "accept_encodings", "gzip, deflate, zstd"), + UserHeaders1 = case lists:keyfind("Accept-Encoding", 1, UserHeaders0) of + false -> [{"Accept-Encoding", AcceptEncodings} | UserHeaders0]; + _ -> UserHeaders0 + end, + Body0 = get_value(body, Params, []), + CompressEnabled = config:get_boolean("replicator", "compress_requests", true), + {Body, UserHeaders2} = case CompressEnabled andalso should_compress_request(Body0) of + true -> + Algorithm = get_compression_algorithm(), + CompressedBody = compress_body(Body0), + ContentEncoding = get_content_encoding(Algorithm), + UpdatedHeaders = case lists:keyfind("Content-Encoding", 1, UserHeaders1) of + false -> [{"Content-Encoding", ContentEncoding} | UserHeaders1]; + _ -> UserHeaders1 + end, + % Track compression algorithm usage + couch_stats:increment_counter([couch_replicator, requests, compressed]), + couch_stats:increment_counter([couch_replicator, requests, compressed, Algorithm]), + {CompressedBody, UpdatedHeaders}; + false -> + {Body0, UserHeaders1} + end, + + Headers1 = merge_headers(BaseHeaders, UserHeaders2), {Headers2, HttpDb} = couch_replicator_auth:update_headers(HttpDb0, Headers1), Url0 = full_url(HttpDb, Params), - Body = get_value(body, Params, []), case get_value(path, Params) == "_changes" of true -> Timeout = infinity; @@ -182,6 +281,9 @@ process_response({error, connection_closing}, Worker, HttpDb, Params, _Cb) -> process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) -> process_stream_response(ReqId, Worker, HttpDb, Params, Callback); process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) -> + % Decompress body if it's gzip compressed + DecompressedBody = decompress_body(Headers, Body), + case list_to_integer(Code) of R when R =:= 301; R =:= 302; R =:= 303 -> backoff_success(HttpDb, Params), @@ -195,7 +297,7 @@ process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) -> backoff_success(HttpDb, Params), couch_stats:increment_counter([couch_replicator, responses, success]), EJson = - case Body of + case DecompressedBody of <<>> -> null; Json ->