Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions rel/overlay/etc/default.ini
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,13 @@ partitioned||* = true
; *.example.com:443:[2001:db8::1]:443
;connect_to =

; Compression settings for replication
;compress_requests = true
;compress_min_size = 1024
;compression_algorithm = gzip
;accept_encodings = gzip, deflate


; Some socket options that might boost performance in some scenarios:
; {nodelay, boolean()}
; {sndbuf, integer()}
Expand Down
32 changes: 32 additions & 0 deletions src/couch_replicator/COMPRESSION_CONFIG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# CouchDB Replicator Compression

## Overview

The replicator now supports configurable HTTP compression to reduce bandwidth during replication.

## Configuration

```ini
[replicator]
; Enable compression (default: true)
compress_requests = true

; Minimum body size to compress in bytes (default: 1024)
compress_min_size = 1024

; Algorithm: gzip (default), deflate
compression_algorithm = gzip

; Accept these encodings in responses
accept_encodings = gzip, deflate
```

## Algorithms

- **gzip** (default): Best compatibility, built-in to Erlang
- **deflate**: Built-in to Erlang, slightly faster than gzip

## Statistics

- `couch_replicator.requests.compressed` - Total compressed requests
- `couch_replicator.requests.compressed.{algorithm}` - Per-algorithm stats
110 changes: 106 additions & 4 deletions src/couch_replicator/src/couch_replicator_httpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
-export([stop_http_worker/0]).
-export([full_url/2]).


-import(couch_util, [
get_value/2,
get_value/3
Expand All @@ -40,6 +41,80 @@
% where we may end up processing an unbounded number of messages.
-define(MAX_DISCARDED_MESSAGES, 100).

should_compress_request(Body) when is_binary(Body) ->
MinSize = config:get_integer("replicator", "compress_min_size", 1024),
byte_size(Body) >= MinSize;
should_compress_request(Body) when is_list(Body) ->
should_compress_request(iolist_to_binary(Body));
should_compress_request(_) ->
false.

get_compression_algorithm() ->
% Supported: gzip (default), deflate
Algorithm = config:get("replicator", "compression_algorithm", "gzip"),
case Algorithm of
"gzip" -> gzip;
"deflate" -> deflate;
_ ->
couch_log:warning(
"couch_replicator_httpc: Unknown compression algorithm ~p, using gzip",
[Algorithm]
),
gzip
end.

compress_body(Body) when is_binary(Body) ->
compress_body_with_algorithm(Body, get_compression_algorithm());
compress_body(Body) when is_list(Body) ->
compress_body(iolist_to_binary(Body));
compress_body(Body) ->
Body.

compress_body_with_algorithm(Body, gzip) ->
zlib:gzip(Body);
compress_body_with_algorithm(Body, deflate) ->
zlib:compress(Body).

get_content_encoding(gzip) -> "gzip";
get_content_encoding(deflate) -> "deflate".

decompress_body(Headers, Body) ->
case lists:keyfind("Content-Encoding", 1, Headers) of
{"Content-Encoding", Encoding} ->
decompress_body_with_encoding(Encoding, Body);
_ ->
Body
end.

decompress_body_with_encoding("gzip", Body) ->
try
zlib:gunzip(Body)
catch
error:data_error ->
couch_log:warning(
"couch_replicator_httpc: Failed to decompress gzip response, using original",
[]
),
Body
end;
decompress_body_with_encoding("deflate", Body) ->
try
zlib:uncompress(Body)
catch
error:data_error ->
couch_log:warning(
"couch_replicator_httpc: Failed to decompress deflate response, using original",
[]
),
Body
end;
decompress_body_with_encoding(Other, Body) ->
couch_log:warning(
"couch_replicator_httpc: Unknown content encoding ~p, using original body",
[Other]
),
Body.

setup(Db) ->
#httpdb{
httpc_pool = nil,
Expand Down Expand Up @@ -108,11 +183,35 @@ stop_http_worker() ->

send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb0, Params) ->
Method = get_value(method, Params, get),
UserHeaders = get_value(headers, Params, []),
Headers1 = merge_headers(BaseHeaders, UserHeaders),
UserHeaders0 = get_value(headers, Params, []),
% Accept multiple compression algorithms
AcceptEncodings = config:get("replicator", "accept_encodings", "gzip, deflate, zstd"),
UserHeaders1 = case lists:keyfind("Accept-Encoding", 1, UserHeaders0) of
false -> [{"Accept-Encoding", AcceptEncodings} | UserHeaders0];
_ -> UserHeaders0
end,
Body0 = get_value(body, Params, []),
CompressEnabled = config:get_boolean("replicator", "compress_requests", true),
{Body, UserHeaders2} = case CompressEnabled andalso should_compress_request(Body0) of
true ->
Algorithm = get_compression_algorithm(),
CompressedBody = compress_body(Body0),
ContentEncoding = get_content_encoding(Algorithm),
UpdatedHeaders = case lists:keyfind("Content-Encoding", 1, UserHeaders1) of
false -> [{"Content-Encoding", ContentEncoding} | UserHeaders1];
_ -> UserHeaders1
end,
% Track compression algorithm usage
couch_stats:increment_counter([couch_replicator, requests, compressed]),
couch_stats:increment_counter([couch_replicator, requests, compressed, Algorithm]),
{CompressedBody, UpdatedHeaders};
false ->
{Body0, UserHeaders1}
end,

Headers1 = merge_headers(BaseHeaders, UserHeaders2),
{Headers2, HttpDb} = couch_replicator_auth:update_headers(HttpDb0, Headers1),
Url0 = full_url(HttpDb, Params),
Body = get_value(body, Params, []),
case get_value(path, Params) == "_changes" of
true ->
Timeout = infinity;
Expand Down Expand Up @@ -182,6 +281,9 @@ process_response({error, connection_closing}, Worker, HttpDb, Params, _Cb) ->
process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
% Decompress body if it's gzip compressed
DecompressedBody = decompress_body(Headers, Body),

case list_to_integer(Code) of
R when R =:= 301; R =:= 302; R =:= 303 ->
backoff_success(HttpDb, Params),
Expand All @@ -195,7 +297,7 @@ process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
backoff_success(HttpDb, Params),
couch_stats:increment_counter([couch_replicator, responses, success]),
EJson =
case Body of
case DecompressedBody of
<<>> ->
null;
Json ->
Expand Down