From 8139750d3f99ef1170d1b4427ff1b9415c4efb39 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 20 Nov 2024 16:06:52 +0800 Subject: [PATCH 1/8] add ChangingPointSamplingProcessor --- .../ChangingPointSamplingProcessor.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changingpoint/ChangingPointSamplingProcessor.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changingpoint/ChangingPointSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changingpoint/ChangingPointSamplingProcessor.java new file mode 100644 index 0000000000000..cd243fba2140b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changingpoint/ChangingPointSamplingProcessor.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling.changingpoint; + +public class ChangingPointSamplingProcessor { +} From 0e23f8cacf1c0385634bd49db5a3179767070419 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 21 Nov 2024 19:40:42 +0800 Subject: [PATCH 2/8] Pipe: Reconstruct downsampling processor --- .../PipeDataRegionProcessorConstructor.java | 4 +- ...Processor.java => DownSamplingFilter.java} | 26 +- .../downsampling/DownSamplingProcessor.java | 113 ++++++++- ...ueFilter.java => ChangingPointFilter.java} | 67 +++--- .../ChangingPointSamplingProcessor.java | 226 ++++++++++++++++++ .../ChangingValueSamplingProcessor.java | 199 --------------- .../sdt/SwingingDoorTrendingFilter.java | 81 ++++--- ...SwingingDoorTrendingSamplingProcessor.java | 143 +++++------ .../TumblingTimeSamplingProcessor.java | 12 +- .../plugin/builtin/BuiltinPipePlugin.java | 6 +- ...va => ChangingPointSamplingProcessor.java} | 2 +- .../constant/PipeProcessorConstant.java | 30 ++- 12 files changed, 537 insertions(+), 372 deletions(-) rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/{changingpoint/ChangingPointSamplingProcessor.java => DownSamplingFilter.java} (55%) rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/{ChangingValueFilter.java => ChangingPointFilter.java} (51%) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java delete mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueSamplingProcessor.java rename iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/downsampling/{ChangingValueSamplingProcessor.java => ChangingPointSamplingProcessor.java} (95%) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java index 44c6ef17800b5..eea003742f6b5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java @@ -27,7 +27,7 @@ import org.apache.iotdb.db.pipe.processor.aggregate.AggregateProcessor; import org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardStatisticsOperatorProcessor; import org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSamplingProcessor; +import org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingPointSamplingProcessor; import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor; import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor; import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor; @@ -51,7 +51,7 @@ protected void initConstructors() { SwingingDoorTrendingSamplingProcessor::new); pluginConstructors.put( BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(), - ChangingValueSamplingProcessor::new); + ChangingPointSamplingProcessor::new); pluginConstructors.put( BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(), ThrowingExceptionProcessor::new); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changingpoint/ChangingPointSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java similarity index 55% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changingpoint/ChangingPointSamplingProcessor.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java index cd243fba2140b..b78eec3f7e684 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changingpoint/ChangingPointSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java @@ -17,7 +17,29 @@ * under the License. */ -package org.apache.iotdb.db.pipe.processor.downsampling.changingpoint; +package org.apache.iotdb.db.pipe.processor.downsampling; -public class ChangingPointSamplingProcessor { +public abstract class DownSamplingFilter { + + protected long lastPointArrivalTime; + + protected long lastPointEventTime; + + public DownSamplingFilter(long arrivalTime, long eventTime) { + this.lastPointArrivalTime = arrivalTime; + this.lastPointEventTime = eventTime; + } + + public void reset(final long arrivalTime, final long eventTime) { + this.lastPointArrivalTime = arrivalTime; + this.lastPointEventTime = eventTime; + } + + public long getLastPointArrivalTime() { + return lastPointArrivalTime; + } + + public long getLastPointEventTime() { + return lastPointEventTime; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java index fd631772b930a..53f6ce217c8cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.processor.downsampling; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -52,7 +53,29 @@ public abstract class DownSamplingProcessor implements PipeProcessor { protected String dataBaseNameWithPathSeparator; - protected PartialPathLastObjectCache pathLastObjectCache; + /** + * The minimum interval of arrival times in milliseconds. Represents the minimum time interval + * between the arrival times of two consecutive events. + */ + protected long arrivalTimeMinInterval; + + /** + * The maximum interval of arrival times in milliseconds. Represents the maximum time interval + * between the arrival times of two consecutive events. + */ + protected long arrivalTimeMaxInterval; + + /** + * The minimum interval of event times in milliseconds. Represents the minimum time interval + * between the event times of two consecutive events. + */ + protected long eventTimeMinInterval; + + /** + * The maximum interval of event times in milliseconds. Represents the maximum time interval + * between the event times of two consecutive events. + */ + protected long eventTimeMaxInterval; @Override public void validate(PipeParameterValidator validator) throws Exception { @@ -71,6 +94,54 @@ public void validate(PipeParameterValidator validator) throws Exception { memoryLimitInBytes); } + public void validatorTimeInterval(final PipeParameterValidator validator) throws Exception { + validator + .validate( + eventTimeMinInterval -> (long) eventTimeMinInterval >= 0, + String.format( + "%s must be >= 0, but got %s", + PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, eventTimeMinInterval), + eventTimeMinInterval) + .validate( + eventTimeMaxInterval -> (long) eventTimeMaxInterval >= 0, + String.format( + "%s must be >= 0, but got %s", + PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, eventTimeMaxInterval), + eventTimeMaxInterval) + .validate( + minMaxPair -> (Long) minMaxPair[0] <= (Long) minMaxPair[1], + String.format( + "%s must be <= %s, but got %s and %s", + PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, + PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, + eventTimeMinInterval, + eventTimeMaxInterval), + eventTimeMinInterval, + eventTimeMaxInterval) + .validate( + arrivalTimeMinInterval -> (long) arrivalTimeMinInterval >= 0, + String.format( + "%s must be >= 0, but got %s", + PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, arrivalTimeMinInterval), + arrivalTimeMinInterval) + .validate( + arrivalTimeMaxInterval -> (long) arrivalTimeMaxInterval >= 0, + String.format( + "%s must be >= 0, but got %s", + PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, arrivalTimeMaxInterval), + arrivalTimeMaxInterval) + .validate( + minMaxPair -> (Long) minMaxPair[0] <= (Long) minMaxPair[1], + String.format( + "%s must be <= %s, but got %s and %s", + PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, + PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, + arrivalTimeMinInterval, + arrivalTimeMaxInterval), + arrivalTimeMinInterval, + arrivalTimeMaxInterval); + } + @Override public void customize( PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) { @@ -88,11 +159,9 @@ public void customize( .getRegionId())) .getDatabaseName() + TsFileConstant.PATH_SEPARATOR; - - pathLastObjectCache = initPathLastObjectCache(memoryLimitInBytes); } - protected abstract PartialPathLastObjectCache initPathLastObjectCache(long memoryLimitInBytes); + protected abstract void initPathLastObjectCache(long memoryLimitInBytes); @Override public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) @@ -138,6 +207,35 @@ protected abstract void processRow( String deviceSuffix, AtomicReference exception); + /** + * Determine the arrival time interval and event time interval. + * + * @return true to indicate that the process does not need to be continued and the data can be + * updated directly. False means that the event is discarded directly. Null means that the + * downsampling algorithm can continue to be executed. + */ + protected Boolean filterArrivalTimeAndEventTime( + final DownSamplingFilter filter, final long arrivalTime, final long eventTime) { + final long arrivalTimeInterval = Math.abs(arrivalTime - filter.getLastPointArrivalTime()); + + if (arrivalTimeInterval >= arrivalTimeMaxInterval) { + return Boolean.TRUE; + } + if (arrivalTimeInterval < arrivalTimeMinInterval) { + return Boolean.FALSE; + } + + final long eventTimeInterval = Math.abs(eventTime - filter.getLastPointEventTime()); + if (eventTimeInterval >= eventTimeMaxInterval) { + return Boolean.TRUE; + } + + if (eventTimeInterval < eventTimeMinInterval) { + return Boolean.FALSE; + } + return null; + } + /** * If data comes in {@link TsFileInsertionEvent}, we will not split it into {@link * TabletInsertionEvent} by default, because the data in {@link TsFileInsertionEvent} is already @@ -165,11 +263,4 @@ public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector ev public void process(Event event, EventCollector eventCollector) throws Exception { eventCollector.collect(event); } - - @Override - public void close() throws Exception { - if (pathLastObjectCache != null) { - pathLastObjectCache.close(); - } - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java similarity index 51% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java index 7dc8c87c09b2d..04a76c82cb0c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java @@ -19,57 +19,53 @@ package org.apache.iotdb.db.pipe.processor.downsampling.changing; +import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingFilter; import org.apache.iotdb.pipe.api.type.Binary; +import org.apache.tsfile.utils.RamUsageEstimator; + import java.time.LocalDate; import java.util.Objects; -public class ChangingValueFilter { +public class ChangingPointFilter extends DownSamplingFilter { - private final ChangingValueSamplingProcessor processor; + private static final long estimatedMemory = + RamUsageEstimator.shallowSizeOfInstance(ChangingPointFilter.class); /** - * The last stored time and value we compare current point against lastReadTimestamp and - * lastReadValue + * The maximum absolute difference the user set if the data's value is within + * compressionDeviation, it will be compressed and discarded after compression */ - private long lastStoredTimestamp; + private final double compressionDeviation; - private T lastStoredValue; + private Object lastStoredValue; - public ChangingValueFilter( - final ChangingValueSamplingProcessor processor, + public ChangingPointFilter( + final long arrivalTime, final long firstTimestamp, - final T firstValue) { - this.processor = processor; - init(firstTimestamp, firstValue); + final Object firstValue, + final double compressionDeviation) { + super(arrivalTime, firstTimestamp); + lastStoredValue = firstValue; + this.compressionDeviation = compressionDeviation; } - private void init(final long firstTimestamp, final T firstValue) { - lastStoredTimestamp = firstTimestamp; + private void init(final long arrivalTime, long firstTimestamp, final Object firstValue) { + lastPointArrivalTime = arrivalTime; + lastPointEventTime = firstTimestamp; lastStoredValue = firstValue; } - public boolean filter(final long timestamp, final T value) { + public boolean filter(final long arrivalTime, final long timestamp, final Object value) { try { - return tryFilter(timestamp, value); + return tryFilter(arrivalTime, timestamp, value); } catch (final Exception e) { - init(timestamp, value); + init(arrivalTime, timestamp, value); return true; } } - private boolean tryFilter(final long timestamp, final T value) { - final long timeDiff = Math.abs(timestamp - lastStoredTimestamp); - - if (timeDiff <= processor.getCompressionMinTimeInterval()) { - return false; - } - - if (timeDiff >= processor.getCompressionMaxTimeInterval()) { - reset(timestamp, value); - return true; - } - + private boolean tryFilter(final long arrivalTime, final long timestamp, final Object value) { // For non-numerical types, we only compare the value if (value instanceof Boolean || value instanceof String @@ -79,23 +75,28 @@ private boolean tryFilter(final long timestamp, final T value) { return false; } - reset(timestamp, value); + reset(arrivalTime, timestamp, value); return true; } // For other numerical types, we compare the value difference if (Math.abs( Double.parseDouble(lastStoredValue.toString()) - Double.parseDouble(value.toString())) - > processor.getCompressionDeviation()) { - reset(timestamp, value); + > compressionDeviation) { + reset(arrivalTime, timestamp, value); return true; } return false; } - private void reset(final long timestamp, final T value) { - lastStoredTimestamp = timestamp; + public void reset(final long arrivalTime, final long timestamp, final Object value) { + lastPointArrivalTime = arrivalTime; + lastPointEventTime = timestamp; lastStoredValue = value; } + + public long estimatedMemory() { + return estimatedMemory + 64; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java new file mode 100644 index 0000000000000..e097617b1ed13 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling.changing; + +import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; +import org.apache.iotdb.db.pipe.event.common.row.PipeRemarkableRow; +import org.apache.iotdb.db.pipe.event.common.row.PipeRow; +import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor; +import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.pipe.api.access.Row; +import org.apache.iotdb.pipe.api.collector.RowCollector; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import org.apache.tsfile.common.constant.TsFileConstant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +public class ChangingPointSamplingProcessor extends DownSamplingProcessor { + + private static final Logger LOGGER = + LoggerFactory.getLogger(ChangingPointSamplingProcessor.class); + + /** + * The maximum absolute difference the user set if the data's value is within + * compressionDeviation, it will be compressed and discarded after compression + */ + private double compressionDeviation; + + private PartialPathLastObjectCache pathLastObjectCache; + + @Override + public void validate(PipeParameterValidator validator) throws Exception { + super.validate(validator); + + final PipeParameters parameters = validator.getParameters(); + compressionDeviation = + parameters.getDoubleOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION_DEFAULT_VALUE); + + final boolean isChangingPointProcessor = + parameters.getString("processor").equals("changing-point-sampling-processor"); + + if (isChangingPointProcessor) { + compressionDeviation = + parameters.getDoubleOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_INTERVAL, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_INTERVAL_DEFAULT_VALUE); + eventTimeMinInterval = + parameters.getLongOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL_DEFAULT_VALUE); + eventTimeMaxInterval = + parameters.getLongOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL_DEFAULT_VALUE); + arrivalTimeMinInterval = + parameters.getLongOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL, + PipeProcessorConstant + .PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL_DEFAULT_VALUE); + arrivalTimeMaxInterval = + parameters.getLongOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL, + PipeProcessorConstant + .PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE); + } else { + compressionDeviation = + parameters.getDoubleOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_INTERVAL_DEFAULT_VALUE); + eventTimeMinInterval = + parameters.getLongOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY, + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_DEFAULT_VALUE); + eventTimeMinInterval = + parameters.getLongOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE); + arrivalTimeMinInterval = 0; + arrivalTimeMaxInterval = Long.MAX_VALUE; + } + + validatorTimeInterval(validator); + validator.validate( + compressionDeviation -> (Double) compressionDeviation >= 0, + String.format( + "%s must be >= 0, but got %s", + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, + compressionDeviation), + compressionDeviation); + } + + @Override + public void customize( + PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) { + super.customize(parameters, configuration); + + final boolean isChangingPointProcessor = + parameters.getString("processor").equals("changing-point-sampling-processor"); + + if (isChangingPointProcessor) { + LOGGER.info( + "ChangingPointSamplingProcessor in {} is initialized with {}: {}, {}: {}, {}: {}, {}: {}, {}: {}.", + dataBaseNameWithPathSeparator, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_INTERVAL, + compressionDeviation, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL, + arrivalTimeMinInterval, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL, + arrivalTimeMaxInterval, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL, + eventTimeMinInterval, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL, + eventTimeMaxInterval); + } else { + LOGGER.info( + "ChangingValueSamplingProcessor in {} is initialized with {}: {}, {}: {}, {}: {}.", + dataBaseNameWithPathSeparator, + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, + compressionDeviation, + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY, + eventTimeMinInterval, + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, + eventTimeMaxInterval); + } + + initPathLastObjectCache(memoryLimitInBytes); + } + + @Override + protected void initPathLastObjectCache(final long memoryLimitInBytes) { + pathLastObjectCache = + new PartialPathLastObjectCache(memoryLimitInBytes) { + @Override + protected long calculateMemoryUsage(ChangingPointFilter object) { + return object.estimatedMemory(); + } + }; + } + + @Override + protected void processRow( + Row row, + RowCollector rowCollector, + String deviceSuffix, + AtomicReference exception) { + final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow) row); + final long currentRowTime = row.getTime(); + final long arrivalTime = System.currentTimeMillis(); + + boolean hasNonNullMeasurements = false; + for (int i = 0, size = row.size(); i < size; i++) { + if (row.isNull(i)) { + continue; + } + + final String timeSeriesSuffix = + deviceSuffix + TsFileConstant.PATH_SEPARATOR + row.getColumnName(i); + final ChangingPointFilter filter = + pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix); + + if (Objects.nonNull(filter)) { + final Boolean result = filterArrivalTimeAndEventTime(filter, arrivalTime, currentRowTime); + if (Objects.isNull(result)) { + if (filter.filter(arrivalTime, currentRowTime, row.getObject(i))) { + hasNonNullMeasurements = true; + } else { + remarkableRow.markNull(i); + } + continue; + } + + if (result == Boolean.FALSE) { + remarkableRow.markNull(i); + continue; + } + } else { + pathLastObjectCache.setPartialPathLastObject( + timeSeriesSuffix, + new ChangingPointFilter( + arrivalTime, currentRowTime, row.getObject(i), compressionDeviation)); + } + + hasNonNullMeasurements = true; + } + + if (hasNonNullMeasurements) { + try { + rowCollector.collectRow(remarkableRow); + } catch (IOException e) { + exception.set(e); + } + } + } + + @Override + public void close() throws Exception { + if (pathLastObjectCache != null) { + pathLastObjectCache.close(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueSamplingProcessor.java deleted file mode 100644 index 6badb70755bab..0000000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueSamplingProcessor.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.processor.downsampling.changing; - -import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; -import org.apache.iotdb.db.pipe.event.common.row.PipeRemarkableRow; -import org.apache.iotdb.db.pipe.event.common.row.PipeRow; -import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; -import org.apache.iotdb.pipe.api.access.Row; -import org.apache.iotdb.pipe.api.collector.RowCollector; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; - -import org.apache.tsfile.common.constant.TsFileConstant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; - -public class ChangingValueSamplingProcessor extends DownSamplingProcessor { - - private static final Logger LOGGER = - LoggerFactory.getLogger(ChangingValueSamplingProcessor.class); - - /** - * The maximum absolute difference the user set if the data's value is within - * compressionDeviation, it will be compressed and discarded after compression - */ - private double compressionDeviation; - - /** - * The minimum time distance between two stored data points if current point time to the last - * stored point time distance <= compressionMinTimeInterval, current point will NOT be stored - * regardless of compression deviation - */ - private long compressionMinTimeInterval; - - /** - * The maximum time distance between two stored data points if current point time to the last - * stored point time distance >= compressionMaxTimeInterval, current point will be stored - * regardless of compression deviation - */ - private long compressionMaxTimeInterval; - - private PartialPathLastObjectCache> pathLastObjectCache; - - @Override - public void validate(PipeParameterValidator validator) throws Exception { - super.validate(validator); - - final PipeParameters parameters = validator.getParameters(); - compressionDeviation = - parameters.getDoubleOrDefault( - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION_DEFAULT_VALUE); - compressionMinTimeInterval = - parameters.getLongOrDefault( - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_DEFAULT_VALUE); - compressionMaxTimeInterval = - parameters.getLongOrDefault( - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE); - - validator - .validate( - compressionDeviation -> (Double) compressionDeviation >= 0, - String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, - compressionDeviation), - compressionDeviation) - .validate( - compressionMinTimeInterval -> (Long) compressionMinTimeInterval >= 0, - String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY, - compressionMinTimeInterval), - compressionMinTimeInterval) - .validate( - compressionMaxTimeInterval -> (Long) compressionMaxTimeInterval >= 0, - String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, - compressionMaxTimeInterval), - compressionMaxTimeInterval) - .validate( - minMaxPair -> (Long) minMaxPair[0] <= (Long) minMaxPair[1], - String.format( - "%s must be <= %s, but got %s and %s", - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, - compressionMinTimeInterval, - compressionMaxTimeInterval), - compressionMinTimeInterval, - compressionMaxTimeInterval); - } - - @Override - public void customize( - PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) { - super.customize(parameters, configuration); - - LOGGER.info( - "ChangingValueSamplingProcessor in {} is initialized with {}: {}, {}: {}, {}: {}.", - dataBaseNameWithPathSeparator, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, - compressionDeviation, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY, - compressionMinTimeInterval, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, - compressionMaxTimeInterval); - } - - @Override - protected PartialPathLastObjectCache initPathLastObjectCache(long memoryLimitInBytes) { - pathLastObjectCache = - new PartialPathLastObjectCache>(memoryLimitInBytes) { - @Override - protected long calculateMemoryUsage(ChangingValueFilter object) { - return 64; // Long.BYTES * 8 - } - }; - return pathLastObjectCache; - } - - @Override - protected void processRow( - Row row, - RowCollector rowCollector, - String deviceSuffix, - AtomicReference exception) { - final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow) row); - - boolean hasNonNullMeasurements = false; - for (int i = 0, size = row.size(); i < size; i++) { - if (row.isNull(i)) { - continue; - } - - final String timeSeriesSuffix = - deviceSuffix + TsFileConstant.PATH_SEPARATOR + row.getColumnName(i); - final ChangingValueFilter filter = - pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix); - - if (filter != null) { - if (filter.filter(row.getTime(), row.getObject(i))) { - hasNonNullMeasurements = true; - } else { - remarkableRow.markNull(i); - } - } else { - hasNonNullMeasurements = true; - pathLastObjectCache.setPartialPathLastObject( - timeSeriesSuffix, new ChangingValueFilter<>(this, row.getTime(), row.getObject(i))); - } - } - - if (hasNonNullMeasurements) { - try { - rowCollector.collectRow(remarkableRow); - } catch (IOException e) { - exception.set(e); - } - } - } - - double getCompressionDeviation() { - return compressionDeviation; - } - - long getCompressionMinTimeInterval() { - return compressionMinTimeInterval; - } - - long getCompressionMaxTimeInterval() { - return compressionMaxTimeInterval; - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java index 850cbd3ed0729..f651852ab52a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java @@ -19,14 +19,25 @@ package org.apache.iotdb.db.pipe.processor.downsampling.sdt; +import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingFilter; import org.apache.iotdb.pipe.api.type.Binary; +import org.apache.tsfile.utils.RamUsageEstimator; + import java.time.LocalDate; import java.util.Objects; -public class SwingingDoorTrendingFilter { +public class SwingingDoorTrendingFilter extends DownSamplingFilter { + + private static final long estimatedMemory = + RamUsageEstimator.shallowSizeOfInstance(SwingingDoorTrendingFilter.class); - private final SwingingDoorTrendingSamplingProcessor processor; + /** + * The maximum absolute difference the user set if the data's value is within + * compressionDeviation, it will be compressed and discarded after compression, it will only store + * out of range (time, data) to form the trend + */ + private final double compressionDeviation; /** * The maximum curUpperSlope between the lastStoredPoint to the current point upperDoor can only @@ -46,56 +57,44 @@ public class SwingingDoorTrendingFilter { */ private long lastReadTimestamp; - private T lastReadValue; + private Object lastReadValue; - /** - * The last stored time and value we compare current point against lastReadTimestamp and - * lastReadValue - */ - private long lastStoredTimestamp; - - private T lastStoredValue; + private Object lastStoredValue; public SwingingDoorTrendingFilter( - final SwingingDoorTrendingSamplingProcessor processor, - final long firstTimestamp, - final T firstValue) { - this.processor = processor; - init(firstTimestamp, firstValue); + final long arrivalTime, + final long eventTime, + final Object firstValue, + final double compressionDeviation) { + super(arrivalTime, eventTime); + this.lastStoredValue = firstValue; + this.compressionDeviation = compressionDeviation; } - private void init(final long firstTimestamp, final T firstValue) { + private void init(final long arrivalTime, final long firstTimestamp, final Object firstValue) { upperDoor = Double.MIN_VALUE; lowerDoor = Double.MAX_VALUE; lastReadTimestamp = firstTimestamp; lastReadValue = firstValue; - lastStoredTimestamp = firstTimestamp; + lastPointEventTime = firstTimestamp; lastStoredValue = firstValue; + + this.lastPointArrivalTime = arrivalTime; } - public boolean filter(final long timestamp, final T value) { + public boolean filter(final long arrivalTime, final long timestamp, final Object value) { try { - return tryFilter(timestamp, value); + return tryFilter(arrivalTime, timestamp, value); } catch (final Exception e) { - init(timestamp, value); + init(arrivalTime, timestamp, value); return true; } } - private boolean tryFilter(final long timestamp, final T value) { - final long timeDiff = timestamp - lastStoredTimestamp; - final long absTimeDiff = Math.abs(timeDiff); - - if (absTimeDiff <= processor.getCompressionMinTimeInterval()) { - return false; - } - - if (absTimeDiff >= processor.getCompressionMaxTimeInterval()) { - reset(timestamp, value); - return true; - } + private boolean tryFilter(final long arrivalTime, final long timestamp, final Object value) { + final long timeDiff = timestamp - lastPointEventTime; // For boolean and string type, we only compare the value if (value instanceof Boolean @@ -106,7 +105,7 @@ private boolean tryFilter(final long timestamp, final T value) { return false; } - reset(timestamp, value); + reset(arrivalTime, timestamp, value); return true; } @@ -115,18 +114,18 @@ private boolean tryFilter(final long timestamp, final T value) { final double lastStoredDoubleValue = Double.parseDouble(lastStoredValue.toString()); final double valueDiff = doubleValue - lastStoredDoubleValue; - final double currentUpperSlope = (valueDiff - processor.getCompressionDeviation()) / timeDiff; + final double currentUpperSlope = (valueDiff - compressionDeviation) / timeDiff; if (currentUpperSlope > upperDoor) { upperDoor = currentUpperSlope; } - final double currentLowerSlope = (valueDiff + processor.getCompressionDeviation()) / timeDiff; + final double currentLowerSlope = (valueDiff + compressionDeviation) / timeDiff; if (currentLowerSlope < lowerDoor) { lowerDoor = currentLowerSlope; } if (upperDoor > lowerDoor) { - lastStoredTimestamp = lastReadTimestamp; + lastPointEventTime = lastReadTimestamp; lastStoredValue = lastReadValue; upperDoor = currentUpperSlope; @@ -144,11 +143,17 @@ private boolean tryFilter(final long timestamp, final T value) { return false; } - private void reset(final long timestamp, final T value) { + public void reset(final long arrivalTime, final long timestamp, final Object value) { upperDoor = Double.MIN_VALUE; lowerDoor = Double.MAX_VALUE; - lastStoredTimestamp = timestamp; + lastPointEventTime = timestamp; lastStoredValue = value; + + lastPointArrivalTime = arrivalTime; + } + + public long estimatedMemory() { + return estimatedMemory + 64; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java index 18a453e2e8dc0..68b0f01fb6220 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java @@ -35,6 +35,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; public class SwingingDoorTrendingSamplingProcessor extends DownSamplingProcessor { @@ -49,21 +51,7 @@ public class SwingingDoorTrendingSamplingProcessor extends DownSamplingProcessor */ private double compressionDeviation; - /** - * The minimum time distance between two stored data points if current point time to the last - * stored point time distance <= compressionMinTimeInterval, current point will NOT be stored - * regardless of compression deviation - */ - private long compressionMinTimeInterval; - - /** - * The maximum time distance between two stored data points if current point time to the last - * stored point time distance >= compressionMaxTimeInterval, current point will be stored - * regardless of compression deviation - */ - private long compressionMaxTimeInterval; - - private PartialPathLastObjectCache> pathLastObjectCache; + private PartialPathLastObjectCache pathLastObjectCache; @Override public void validate(PipeParameterValidator validator) throws Exception { @@ -74,47 +62,29 @@ public void validate(PipeParameterValidator validator) throws Exception { parameters.getDoubleOrDefault( PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY, PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_DEFAULT_VALUE); - compressionMinTimeInterval = + eventTimeMinInterval = + parameters.getLongOrDefault( + Arrays.asList( + PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MIN_INTERVAL, + PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY), + PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MIN_INTERVAL_DEFAULT_VALUE); + eventTimeMaxInterval = + parameters.getLongOrDefault( + Arrays.asList( + PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MAX_INTERVAL, + PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY), + PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MAX_INTERVAL_DEFAULT_VALUE); + arrivalTimeMinInterval = parameters.getLongOrDefault( - PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_DEFAULT_VALUE); - compressionMaxTimeInterval = + PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MIN_INTERVAL, + PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MIN_INTERVAL_DEFAULT_VALUE); + arrivalTimeMaxInterval = parameters.getLongOrDefault( - PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE); - - validator - .validate( - compressionDeviation -> (Double) compressionDeviation >= 0, - String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY, - compressionDeviation), - compressionDeviation) - .validate( - compressionMinTimeInterval -> (Long) compressionMinTimeInterval >= 0, - String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, - compressionMinTimeInterval), - compressionMinTimeInterval) - .validate( - compressionMaxTimeInterval -> (Long) compressionMaxTimeInterval >= 0, - String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, - compressionMaxTimeInterval), - compressionMaxTimeInterval) - .validate( - minMaxPair -> (Long) minMaxPair[0] <= (Long) minMaxPair[1], - String.format( - "%s must be <= %s, but got %s and %s", - PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, - compressionMinTimeInterval, - compressionMaxTimeInterval), - compressionMinTimeInterval, - compressionMaxTimeInterval); + PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MAX_INTERVAL, + PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE); + + validatorTimeInterval(validator); + initPathLastObjectCache(memoryLimitInBytes); } @Override @@ -123,26 +93,31 @@ public void customize( super.customize(parameters, configuration); LOGGER.info( - "SwingingDoorTrendingSamplingProcessor in {} is initialized with {}: {}, {}: {}, {}: {}.", + "SwingingDoorTrendingSamplingProcessor in {} is initialized with {}: {}, {}: {}, {}: {}, {}: {}, {}: {}.", dataBaseNameWithPathSeparator, PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY, compressionDeviation, - PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, - compressionMinTimeInterval, - PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, - compressionMaxTimeInterval); + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL, + arrivalTimeMinInterval, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL, + arrivalTimeMaxInterval, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL, + eventTimeMinInterval, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL, + eventTimeMaxInterval); + + initPathLastObjectCache(memoryLimitInBytes); } @Override - protected PartialPathLastObjectCache initPathLastObjectCache(long memoryLimitInBytes) { + protected void initPathLastObjectCache(final long memoryLimitInBytes) { pathLastObjectCache = - new PartialPathLastObjectCache>(memoryLimitInBytes) { + new PartialPathLastObjectCache(memoryLimitInBytes) { @Override - protected long calculateMemoryUsage(SwingingDoorTrendingFilter object) { - return 64; // Long.BYTES * 8 + protected long calculateMemoryUsage(SwingingDoorTrendingFilter object) { + return object.estimatedMemory(); } }; - return pathLastObjectCache; } @Override @@ -152,6 +127,8 @@ protected void processRow( String deviceSuffix, AtomicReference exception) { final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow) row); + final long currentRowTime = row.getTime(); + final long arrivalTime = System.currentTimeMillis(); boolean hasNonNullMeasurements = false; for (int i = 0, size = row.size(); i < size; i++) { @@ -164,18 +141,29 @@ protected void processRow( final SwingingDoorTrendingFilter filter = pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix); - if (filter != null) { - if (filter.filter(row.getTime(), row.getObject(i))) { - hasNonNullMeasurements = true; - } else { + if (Objects.nonNull(filter)) { + final Boolean result = filterArrivalTimeAndEventTime(filter, arrivalTime, currentRowTime); + if (Objects.isNull(result)) { + if (filter.filter(arrivalTime, currentRowTime, row.getObject(i))) { + hasNonNullMeasurements = true; + } else { + remarkableRow.markNull(i); + } + continue; + } + + if (result == Boolean.FALSE) { remarkableRow.markNull(i); + continue; } } else { - hasNonNullMeasurements = true; pathLastObjectCache.setPartialPathLastObject( timeSeriesSuffix, - new SwingingDoorTrendingFilter<>(this, row.getTime(), row.getObject(i))); + new SwingingDoorTrendingFilter( + arrivalTime, currentRowTime, row.getObject(i), compressionDeviation)); } + + hasNonNullMeasurements = true; } if (hasNonNullMeasurements) { @@ -187,15 +175,10 @@ protected void processRow( } } - double getCompressionDeviation() { - return compressionDeviation; - } - - long getCompressionMinTimeInterval() { - return compressionMinTimeInterval; - } - - long getCompressionMaxTimeInterval() { - return compressionMaxTimeInterval; + @Override + public void close() throws Exception { + if (pathLastObjectCache != null) { + pathLastObjectCache.close(); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java index 665f5781801a7..dc1a84ef20c0b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java @@ -82,10 +82,12 @@ public void customize( memoryLimitInBytes, PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY, shouldSplitFile); + + initPathLastObjectCache(memoryLimitInBytes); } @Override - protected PartialPathLastObjectCache initPathLastObjectCache(long memoryLimitInBytes) { + protected void initPathLastObjectCache(final long memoryLimitInBytes) { pathLastObjectCache = new PartialPathLastObjectCache(memoryLimitInBytes) { @Override @@ -93,7 +95,6 @@ protected long calculateMemoryUsage(Long object) { return Long.BYTES; } }; - return pathLastObjectCache; } @Override @@ -132,4 +133,11 @@ protected void processRow( } } } + + @Override + public void close() throws Exception { + if (pathLastObjectCache != null) { + pathLastObjectCache.close(); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index 67f3a0265afbf..3e0685021c224 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -36,7 +36,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.StandardStatisticsProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.TumblingWindowingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.ChangingValueSamplingProcessor; +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.ChangingPointSamplingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.pipeconsensus.PipeConsensusProcessor; @@ -63,7 +63,9 @@ public enum BuiltinPipePlugin { "tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class), SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", SwingingDoorTrendingSamplingProcessor.class), CHANGING_VALUE_SAMPLING_PROCESSOR( - "changing-value-sampling-processor", ChangingValueSamplingProcessor.class), + "changing-value-sampling-processor", ChangingPointSamplingProcessor.class), + CHANGING_POINT_SAMPLING_PROCESSOR( + "changing-point-sampling-processor", ChangingPointSamplingProcessor.class), THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", ThrowingExceptionProcessor.class), AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class), COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/downsampling/ChangingValueSamplingProcessor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/downsampling/ChangingPointSamplingProcessor.java similarity index 95% rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/downsampling/ChangingValueSamplingProcessor.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/downsampling/ChangingPointSamplingProcessor.java index 0cb723dca8cc2..2215a202e8bdd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/downsampling/ChangingValueSamplingProcessor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/downsampling/ChangingPointSamplingProcessor.java @@ -27,4 +27,4 @@ * be imported here. The pipe agent in the server module will replace this class with the real * implementation when initializing the changing-value-sampling-processor. */ -public class ChangingValueSamplingProcessor extends PlaceHolderProcessor {} +public class ChangingPointSamplingProcessor extends PlaceHolderProcessor {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java index f8aef880bd966..5ac550c13dae3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java @@ -61,10 +61,20 @@ public class PipeProcessorConstant { public static final double PROCESSOR_SDT_COMPRESSION_DEVIATION_DEFAULT_VALUE = 0; public static final String PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY = "processor.sdt.min-time-interval"; - public static final long PROCESSOR_SDT_MIN_TIME_INTERVAL_DEFAULT_VALUE = 0; public static final String PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY = "processor.sdt.max-time-interval"; - public static final long PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE = Long.MAX_VALUE; + public static final String PROCESSOR_SDT_ARRIVAL_TIME_MIN_INTERVAL = + "processor.sdt.arrival-time.min-interval"; + public static final String PROCESSOR_SDT_ARRIVAL_TIME_MAX_INTERVAL = + "processor.sdt.arrival-time.max-interval"; + public static final String PROCESSOR_SDT_EVENT_TIME_MIN_INTERVAL = + "processor.sdt.event-time.min-interval"; + public static final String PROCESSOR_SDT_EVENT_TIME_MAX_INTERVAL = + "processor.sdt.event-time.max-interval"; + public static final long PROCESSOR_SDT_ARRIVAL_TIME_MIN_INTERVAL_DEFAULT_VALUE = 0; + public static final long PROCESSOR_SDT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE = 60000; + public static final long PROCESSOR_SDT_EVENT_TIME_MIN_INTERVAL_DEFAULT_VALUE = 0; + public static final long PROCESSOR_SDT_EVENT_TIME_MAX_INTERVAL_DEFAULT_VALUE = 60000; public static final String PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION = "processor.changing-value.compression-deviation"; @@ -77,6 +87,22 @@ public class PipeProcessorConstant { public static final long PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE = Long.MAX_VALUE; + public static final String PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL = + "processor.changing-point.arrival-time.min-interval"; + public static final String PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL = + "processor.changing-point.arrival-time.max-interval"; + public static final String PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL = + "processor.changing-point.event-time.min-interval"; + public static final String PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL = + "processor.changing-point.event-time.max-interval"; + public static final String PROCESSOR_CHANGING_POINT_VALUE_INTERVAL = + "processor.changing-point.value-interval"; + public static final long PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL_DEFAULT_VALUE = 0; + public static final long PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE = 60000; + public static final long PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL_DEFAULT_VALUE = 0; + public static final long PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL_DEFAULT_VALUE = 60000; + public static final double PROCESSOR_CHANGING_POINT_VALUE_INTERVAL_DEFAULT_VALUE = 0; + public static final String _PROCESSOR_OUTPUT_SERIES_KEY = "processor.output-series"; public static final String PROCESSOR_OUTPUT_SERIES_KEY = "processor.output.series"; From e866510636bd79ca565521aad671e280db9389fe Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 5 Dec 2024 21:45:16 +0800 Subject: [PATCH 3/8] add IT --- .../it/autocreate/IoTDBPipeProcessorIT.java | 164 ++++++++++++++++++ .../PipeDataRegionProcessorConstructor.java | 3 + .../downsampling/DownSamplingFilter.java | 15 +- .../downsampling/DownSamplingProcessor.java | 33 ++-- .../changing/ChangingPointFilter.java | 5 +- .../ChangingPointSamplingProcessor.java | 24 ++- ...SwingingDoorTrendingSamplingProcessor.java | 14 +- .../plugin/builtin/BuiltinPipePlugin.java | 1 + 8 files changed, 229 insertions(+), 30 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java index 4556416f5fbe3..aac4f6fb9efbb 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java @@ -144,4 +144,168 @@ public void testTumblingTimeSamplingProcessor() throws Exception { receiverEnv, "select * from root.**", "Time,root.vehicle.d0.s1,", expectedResSet); } } + + @Test + public void testChangingValueProcessor() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + // Test empty tsFile parsing + // Assert that an empty tsFile will not be parsed by the processor then block + // the subsequent data processing + // Do not fail if the failure has nothing to do with pipe + // Because the failures will randomly generate due to resource limitation + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.vehicle.d0(time, s1) values (0, 1)", "delete from root.**"))) { + return; + } + + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("source.realtime.mode", "log"); + + processorAttributes.put("processor", "changing-value-sampling-processor"); + processorAttributes.put("processor.changing-value.compression-deviation", "10"); + processorAttributes.put("processor.changing-value.min-time-interval", "10000"); + processorAttributes.put("processor.changing-value.max-time-interval", "20000"); + + connectorAttributes.put("sink", "iotdb-thrift-sink"); + connectorAttributes.put("sink.batch.enable", "false"); + connectorAttributes.put("sink.ip", receiverIp); + connectorAttributes.put("sink.port", Integer.toString(receiverPort)); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("testPipe", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.vehicle.d0(time, s1) values (0, 10)", + "insert into root.vehicle.d0(time, s1) values (9999, 20)", + "insert into root.vehicle.d0(time, s1) values (10000, 30)", + "insert into root.vehicle.d0(time, s1) values (19000, 40)", + "insert into root.vehicle.d0(time, s1) values (20000, 50)", + "insert into root.vehicle.d0(time, s1) values (29001, 60)", + "insert into root.vehicle.d0(time, s1) values (50000, 70)", + "insert into root.vehicle.d0(time, s1) values (60000, 71)", + "flush"))) { + return; + } + + final Set expectedResSet = new HashSet<>(); + + expectedResSet.add("0,10.0,"); + expectedResSet.add("10000,30.0,"); + expectedResSet.add("20000,50.0,"); + expectedResSet.add("50000,70.0,"); + expectedResSet.add("60000,80.0,"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "select * from root.**", "Time,root.vehicle.d0.s1,", expectedResSet); + } + } + + @Test + public void testChangingPointProcessor() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + // Test empty tsFile parsing + // Assert that an empty tsFile will not be parsed by the processor then block + // the subsequent data processing + // Do not fail if the failure has nothing to do with pipe + // Because the failures will randomly generate due to resource limitation + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.vehicle.d0(time, s1) values (0, 1)", "delete from root.**"))) { + return; + } + + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("source.realtime.mode", "log"); + + processorAttributes.put("processor", "changing-point-sampling-processor"); + processorAttributes.put("processor.changing-point.compression-deviation", "10"); + processorAttributes.put("processor.changing-point.arrival-time.min-interval", "10000"); + processorAttributes.put("processor.changing-point.arrival-time.max-interval", "30000"); + processorAttributes.put("processor.changing-point.event-time.min-interval", "10000"); + processorAttributes.put("processor.changing-point.event-time.max-interval", "30000"); + + connectorAttributes.put("sink", "iotdb-thrift-sink"); + connectorAttributes.put("sink.batch.enable", "false"); + connectorAttributes.put("sink.ip", receiverIp); + connectorAttributes.put("sink.port", Integer.toString(receiverPort)); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("testPipe", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.vehicle.d0(time, s1) values (0, 10)", + "insert into root.vehicle.d0(time, s1) values (100000, 20)", + "insert into root.vehicle.d0(time, s1) values (110000, 30)"))) { + return; + } + + Thread.sleep(10000); + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.vehicle.d0(time, s1) values (100000, 40)", + "insert into root.vehicle.d0(time, s1) values (400000, 50)", + "insert into root.vehicle.d0(time, s1) values (500000, 60)"))) { + return; + } + + Thread.sleep(10000); + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList("insert into root.vehicle.d0(time, s1) values (100000, 41)", "flush"))) { + return; + } + + final Set expectedResSet = new HashSet<>(); + + expectedResSet.add("0,10.0,"); + expectedResSet.add("100000,40.0,"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "select * from root.**", "Time,root.vehicle.d0.s1,", expectedResSet); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java index eea003742f6b5..c2ad29a5ffc2c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java @@ -52,6 +52,9 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(), ChangingPointSamplingProcessor::new); + pluginConstructors.put( + BuiltinPipePlugin.CHANGING_POINT_SAMPLING_PROCESSOR.getPipePluginName(), + ChangingPointSamplingProcessor::new); pluginConstructors.put( BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(), ThrowingExceptionProcessor::new); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java index b78eec3f7e684..b8cf61c715b21 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java @@ -21,11 +21,20 @@ public abstract class DownSamplingFilter { + protected boolean isFilterArrivalTime = true; + protected long lastPointArrivalTime; protected long lastPointEventTime; - public DownSamplingFilter(long arrivalTime, long eventTime) { + public DownSamplingFilter( + final long arrivalTime, final long eventTime, final boolean isFilterArrivalTime) { + this.lastPointArrivalTime = arrivalTime; + this.lastPointEventTime = eventTime; + this.isFilterArrivalTime = isFilterArrivalTime; + } + + public DownSamplingFilter(final long arrivalTime, final long eventTime) { this.lastPointArrivalTime = arrivalTime; this.lastPointEventTime = eventTime; } @@ -42,4 +51,8 @@ public long getLastPointArrivalTime() { public long getLastPointEventTime() { return lastPointEventTime; } + + public boolean isFilterArrivalTime() { + return isFilterArrivalTime; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java index 53f6ce217c8cf..2d92290e9813d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.pipe.processor.downsampling; import org.apache.iotdb.commons.consensus.DataRegionId; -import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -99,21 +98,19 @@ public void validatorTimeInterval(final PipeParameterValidator validator) throws .validate( eventTimeMinInterval -> (long) eventTimeMinInterval >= 0, String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, eventTimeMinInterval), + "%s must be >= 0, but got %s", "event-time.min-interval", eventTimeMinInterval), eventTimeMinInterval) .validate( eventTimeMaxInterval -> (long) eventTimeMaxInterval >= 0, String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, eventTimeMaxInterval), + "%s must be >= 0, but got %s", "event-time.max-interval", eventTimeMaxInterval), eventTimeMaxInterval) .validate( minMaxPair -> (Long) minMaxPair[0] <= (Long) minMaxPair[1], String.format( "%s must be <= %s, but got %s and %s", - PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, + "event-time.min-interval", + "event-time.max-interval", eventTimeMinInterval, eventTimeMaxInterval), eventTimeMinInterval, @@ -121,21 +118,19 @@ public void validatorTimeInterval(final PipeParameterValidator validator) throws .validate( arrivalTimeMinInterval -> (long) arrivalTimeMinInterval >= 0, String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, arrivalTimeMinInterval), + "%s must be >= 0, but got %s", "arrival-time.min-interval", arrivalTimeMinInterval), arrivalTimeMinInterval) .validate( arrivalTimeMaxInterval -> (long) arrivalTimeMaxInterval >= 0, String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, arrivalTimeMaxInterval), + "%s must be >= 0, but got %s", "arrival-time.max-interval", arrivalTimeMaxInterval), arrivalTimeMaxInterval) .validate( minMaxPair -> (Long) minMaxPair[0] <= (Long) minMaxPair[1], String.format( "%s must be <= %s, but got %s and %s", - PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, + "arrival-time.min-interval", + "arrival-time.max-interval", arrivalTimeMinInterval, arrivalTimeMaxInterval), arrivalTimeMinInterval, @@ -218,11 +213,13 @@ protected Boolean filterArrivalTimeAndEventTime( final DownSamplingFilter filter, final long arrivalTime, final long eventTime) { final long arrivalTimeInterval = Math.abs(arrivalTime - filter.getLastPointArrivalTime()); - if (arrivalTimeInterval >= arrivalTimeMaxInterval) { - return Boolean.TRUE; - } - if (arrivalTimeInterval < arrivalTimeMinInterval) { - return Boolean.FALSE; + if (filter.isFilterArrivalTime()) { + if (arrivalTimeInterval >= arrivalTimeMaxInterval) { + return Boolean.TRUE; + } + if (arrivalTimeInterval < arrivalTimeMinInterval) { + return Boolean.FALSE; + } } final long eventTimeInterval = Math.abs(eventTime - filter.getLastPointEventTime()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java index 04a76c82cb0c6..40418a61ddae1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java @@ -44,8 +44,9 @@ public ChangingPointFilter( final long arrivalTime, final long firstTimestamp, final Object firstValue, - final double compressionDeviation) { - super(arrivalTime, firstTimestamp); + final double compressionDeviation, + final boolean isFilterArrivalTime) { + super(arrivalTime, firstTimestamp, isFilterArrivalTime); lastStoredValue = firstValue; this.compressionDeviation = compressionDeviation; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java index e097617b1ed13..1f7a305aee400 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.processor.downsampling.changing; +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.db.pipe.event.common.row.PipeRemarkableRow; import org.apache.iotdb.db.pipe.event.common.row.PipeRow; @@ -49,6 +50,8 @@ public class ChangingPointSamplingProcessor extends DownSamplingProcessor { */ private double compressionDeviation; + private boolean isFilterArrivalTime = true; + private PartialPathLastObjectCache pathLastObjectCache; @Override @@ -62,9 +65,12 @@ public void validate(PipeParameterValidator validator) throws Exception { PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION_DEFAULT_VALUE); final boolean isChangingPointProcessor = - parameters.getString("processor").equals("changing-point-sampling-processor"); + BuiltinPipePlugin.CHANGING_POINT_SAMPLING_PROCESSOR + .getPipePluginName() + .equals(parameters.getString("processor")); if (isChangingPointProcessor) { + isFilterArrivalTime = true; compressionDeviation = parameters.getDoubleOrDefault( PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_INTERVAL, @@ -88,6 +94,7 @@ public void validate(PipeParameterValidator validator) throws Exception { PipeProcessorConstant .PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE); } else { + isFilterArrivalTime = false; compressionDeviation = parameters.getDoubleOrDefault( PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, @@ -96,10 +103,11 @@ public void validate(PipeParameterValidator validator) throws Exception { parameters.getLongOrDefault( PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY, PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_DEFAULT_VALUE); - eventTimeMinInterval = + eventTimeMaxInterval = parameters.getLongOrDefault( PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE); + // will not be used arrivalTimeMinInterval = 0; arrivalTimeMaxInterval = Long.MAX_VALUE; } @@ -194,15 +202,23 @@ protected void processRow( continue; } - if (result == Boolean.FALSE) { + // It will not be null + if (!result) { remarkableRow.markNull(i); continue; } + + // The arrival time or event time is greater than the maximum time interval + filter.reset(arrivalTime, currentRowTime, row.getObject(i)); } else { pathLastObjectCache.setPartialPathLastObject( timeSeriesSuffix, new ChangingPointFilter( - arrivalTime, currentRowTime, row.getObject(i), compressionDeviation)); + arrivalTime, + currentRowTime, + row.getObject(i), + compressionDeviation, + isFilterArrivalTime)); } hasNonNullMeasurements = true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java index 68b0f01fb6220..596dc77c6b6a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java @@ -97,13 +97,13 @@ public void customize( dataBaseNameWithPathSeparator, PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY, compressionDeviation, - PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL, + PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MIN_INTERVAL, arrivalTimeMinInterval, - PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL, + PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MAX_INTERVAL, arrivalTimeMaxInterval, - PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL, + PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MIN_INTERVAL, eventTimeMinInterval, - PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL, + PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MAX_INTERVAL, eventTimeMaxInterval); initPathLastObjectCache(memoryLimitInBytes); @@ -152,10 +152,14 @@ protected void processRow( continue; } - if (result == Boolean.FALSE) { + // It will not be null + if (result) { remarkableRow.markNull(i); continue; } + + // The arrival time or event time is greater than the maximum time interval + filter.reset(arrivalTime, currentRowTime, row.getObject(i)); } else { pathLastObjectCache.setPartialPathLastObject( timeSeriesSuffix, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index 3e0685021c224..aa5ea20ef5e69 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -139,6 +139,7 @@ public String getClassName() { TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), + CHANGING_POINT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(), AGGREGATE_PROCESSOR.getPipePluginName().toUpperCase(), COUNT_POINT_PROCESSOR.getPipePluginName().toUpperCase(), From 9856951f6fab5fdf114d24ab1cbeb3330afc335e Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Fri, 6 Dec 2024 12:18:39 +0800 Subject: [PATCH 4/8] fix --- .../apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java index aac4f6fb9efbb..d53cd2085049b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java @@ -214,7 +214,6 @@ public void testChangingValueProcessor() throws Exception { expectedResSet.add("10000,30.0,"); expectedResSet.add("20000,50.0,"); expectedResSet.add("50000,70.0,"); - expectedResSet.add("60000,80.0,"); TestUtils.assertDataEventuallyOnEnv( receiverEnv, "select * from root.**", "Time,root.vehicle.d0.s1,", expectedResSet); From 6be6d782e782b825972684705bf5d8e31a1a35b4 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Fri, 6 Dec 2024 15:20:03 +0800 Subject: [PATCH 5/8] update ChangingPointSamplingProcessor --- .../processor/downsampling/DownSamplingFilter.java | 10 +++++----- .../downsampling/DownSamplingProcessor.java | 2 +- .../changing/ChangingPointSamplingProcessor.java | 12 +++++++----- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java index b8cf61c715b21..3efc680ce2b86 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java @@ -21,17 +21,17 @@ public abstract class DownSamplingFilter { - protected boolean isFilterArrivalTime = true; + protected boolean isFilteredByArrivalTime = true; protected long lastPointArrivalTime; protected long lastPointEventTime; public DownSamplingFilter( - final long arrivalTime, final long eventTime, final boolean isFilterArrivalTime) { + final long arrivalTime, final long eventTime, final boolean isFilteredByArrivalTime) { this.lastPointArrivalTime = arrivalTime; this.lastPointEventTime = eventTime; - this.isFilterArrivalTime = isFilterArrivalTime; + this.isFilteredByArrivalTime = isFilteredByArrivalTime; } public DownSamplingFilter(final long arrivalTime, final long eventTime) { @@ -52,7 +52,7 @@ public long getLastPointEventTime() { return lastPointEventTime; } - public boolean isFilterArrivalTime() { - return isFilterArrivalTime; + public boolean isFilteredByArrivalTime() { + return isFilteredByArrivalTime; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java index 2d92290e9813d..7329037cfc3eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java @@ -213,7 +213,7 @@ protected Boolean filterArrivalTimeAndEventTime( final DownSamplingFilter filter, final long arrivalTime, final long eventTime) { final long arrivalTimeInterval = Math.abs(arrivalTime - filter.getLastPointArrivalTime()); - if (filter.isFilterArrivalTime()) { + if (filter.isFilteredByArrivalTime()) { if (arrivalTimeInterval >= arrivalTimeMaxInterval) { return Boolean.TRUE; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java index 1f7a305aee400..d3b0401fe245a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java @@ -50,7 +50,7 @@ public class ChangingPointSamplingProcessor extends DownSamplingProcessor { */ private double compressionDeviation; - private boolean isFilterArrivalTime = true; + private boolean isFilteredByArrivalTime = true; private PartialPathLastObjectCache pathLastObjectCache; @@ -70,7 +70,7 @@ public void validate(PipeParameterValidator validator) throws Exception { .equals(parameters.getString("processor")); if (isChangingPointProcessor) { - isFilterArrivalTime = true; + isFilteredByArrivalTime = true; compressionDeviation = parameters.getDoubleOrDefault( PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_INTERVAL, @@ -94,7 +94,7 @@ public void validate(PipeParameterValidator validator) throws Exception { PipeProcessorConstant .PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE); } else { - isFilterArrivalTime = false; + isFilteredByArrivalTime = false; compressionDeviation = parameters.getDoubleOrDefault( PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, @@ -128,7 +128,9 @@ public void customize( super.customize(parameters, configuration); final boolean isChangingPointProcessor = - parameters.getString("processor").equals("changing-point-sampling-processor"); + BuiltinPipePlugin.CHANGING_POINT_SAMPLING_PROCESSOR + .getPipePluginName() + .equals(parameters.getString("processor")); if (isChangingPointProcessor) { LOGGER.info( @@ -218,7 +220,7 @@ protected void processRow( currentRowTime, row.getObject(i), compressionDeviation, - isFilterArrivalTime)); + isFilteredByArrivalTime)); } hasNonNullMeasurements = true; From fdb828fd48c6b740ba9ed38e2fa81c878da2b393 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Fri, 20 Dec 2024 18:18:59 +0800 Subject: [PATCH 6/8] Increase time precision & modify configuration item names & accurate memory management & modify code style --- .../it/autocreate/IoTDBPipeProcessorIT.java | 2 +- .../downsampling/DownSamplingFilter.java | 13 ++++++ .../downsampling/DownSamplingProcessor.java | 25 +++++++++- .../changing/ChangingPointFilter.java | 25 +++++----- .../ChangingPointSamplingProcessor.java | 46 ++++++++++++------- .../sdt/SwingingDoorTrendingFilter.java | 13 ++++-- ...SwingingDoorTrendingSamplingProcessor.java | 27 +++++++---- .../TumblingTimeSamplingProcessor.java | 2 - .../constant/PipeProcessorConstant.java | 6 +-- 9 files changed, 112 insertions(+), 47 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java index d53cd2085049b..c7dae77c5673c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java @@ -248,7 +248,7 @@ public void testChangingPointProcessor() throws Exception { extractorAttributes.put("source.realtime.mode", "log"); processorAttributes.put("processor", "changing-point-sampling-processor"); - processorAttributes.put("processor.changing-point.compression-deviation", "10"); + processorAttributes.put("processor.changing-point.value-variation", "10"); processorAttributes.put("processor.changing-point.arrival-time.min-interval", "10000"); processorAttributes.put("processor.changing-point.arrival-time.max-interval", "30000"); processorAttributes.put("processor.changing-point.event-time.min-interval", "10000"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java index 3efc680ce2b86..a74bee8020c7f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java @@ -19,8 +19,21 @@ package org.apache.iotdb.db.pipe.processor.downsampling; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.time.LocalDate; + public abstract class DownSamplingFilter { + protected static final long SIZE_OF_DATE = + RamUsageEstimator.shallowSizeOfInstance(LocalDate.class); + + protected static final long SIZE_OF_LONG = RamUsageEstimator.shallowSizeOfInstance(Long.class); + + protected static final long SIZE_OF_BINARY = + RamUsageEstimator.shallowSizeOfInstance(Binary.class); + protected boolean isFilteredByArrivalTime = true; protected long lastPointArrivalTime; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java index 7329037cfc3eb..170d8f2822b82 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.utils.TimestampPrecisionUtils; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.EventCollector; @@ -37,6 +38,7 @@ import org.apache.tsfile.common.constant.TsFileConstant; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE; @@ -46,6 +48,26 @@ public abstract class DownSamplingProcessor implements PipeProcessor { + protected static final CurrentTimeFunction currentTime; + + @FunctionalInterface + protected interface CurrentTimeFunction { + long apply(); + } + + static { + switch (TimestampPrecisionUtils.currPrecision) { + case NANOSECONDS: + currentTime = System::nanoTime; + break; + case MICROSECONDS: + currentTime = () -> TimeUnit.NANOSECONDS.toMicros(System.nanoTime()); + break; + default: + currentTime = System::currentTimeMillis; + } + } + protected long memoryLimitInBytes; protected boolean shouldSplitFile; @@ -91,9 +113,10 @@ public void validate(PipeParameterValidator validator) throws Exception { "%s must be > 0, but got %s", PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY, memoryLimitInBytes), memoryLimitInBytes); + initPathLastObjectCache(memoryLimitInBytes); } - public void validatorTimeInterval(final PipeParameterValidator validator) throws Exception { + public void validateTimeInterval(final PipeParameterValidator validator) throws Exception { validator .validate( eventTimeMinInterval -> (long) eventTimeMinInterval >= 0, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java index 40418a61ddae1..945aa4ab01d1d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java @@ -32,6 +32,8 @@ public class ChangingPointFilter extends DownSamplingFilter { private static final long estimatedMemory = RamUsageEstimator.shallowSizeOfInstance(ChangingPointFilter.class); + private final long estimatedSize; + /** * The maximum absolute difference the user set if the data's value is within * compressionDeviation, it will be compressed and discarded after compression @@ -49,11 +51,18 @@ public ChangingPointFilter( super(arrivalTime, firstTimestamp, isFilterArrivalTime); lastStoredValue = firstValue; this.compressionDeviation = compressionDeviation; + + if (lastStoredValue instanceof Binary) { + estimatedSize = estimatedMemory + SIZE_OF_BINARY; + } else if (lastStoredValue instanceof LocalDate) { + estimatedSize = estimatedMemory + SIZE_OF_DATE; + } else { + estimatedSize = estimatedMemory + SIZE_OF_LONG; + } } - private void init(final long arrivalTime, long firstTimestamp, final Object firstValue) { - lastPointArrivalTime = arrivalTime; - lastPointEventTime = firstTimestamp; + public void reset(final long arrivalTime, long firstTimestamp, final Object firstValue) { + reset(arrivalTime, firstTimestamp); lastStoredValue = firstValue; } @@ -61,7 +70,7 @@ public boolean filter(final long arrivalTime, final long timestamp, final Object try { return tryFilter(arrivalTime, timestamp, value); } catch (final Exception e) { - init(arrivalTime, timestamp, value); + reset(arrivalTime, timestamp, value); return true; } } @@ -91,13 +100,7 @@ private boolean tryFilter(final long arrivalTime, final long timestamp, final Ob return false; } - public void reset(final long arrivalTime, final long timestamp, final Object value) { - lastPointArrivalTime = arrivalTime; - lastPointEventTime = timestamp; - lastStoredValue = value; - } - public long estimatedMemory() { - return estimatedMemory + 64; + return estimatedSize; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java index d3b0401fe245a..b0275ae8fb02e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.pipe.event.common.row.PipeRow; import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor; import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.db.utils.TimestampPrecisionUtils; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; @@ -37,6 +38,7 @@ import java.io.IOException; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; public class ChangingPointSamplingProcessor extends DownSamplingProcessor { @@ -73,46 +75,60 @@ public void validate(PipeParameterValidator validator) throws Exception { isFilteredByArrivalTime = true; compressionDeviation = parameters.getDoubleOrDefault( - PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_INTERVAL, - PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_INTERVAL_DEFAULT_VALUE); + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_VARIATION, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_VARIATION_DEFAULT_VALUE); eventTimeMinInterval = parameters.getLongOrDefault( PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL, - PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL_DEFAULT_VALUE); + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant + .PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); eventTimeMaxInterval = parameters.getLongOrDefault( PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL, - PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL_DEFAULT_VALUE); + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant + .PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); arrivalTimeMinInterval = parameters.getLongOrDefault( PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL, - PipeProcessorConstant - .PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL_DEFAULT_VALUE); + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant + .PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); arrivalTimeMaxInterval = parameters.getLongOrDefault( PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL, - PipeProcessorConstant - .PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE); + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant + .PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); } else { isFilteredByArrivalTime = false; compressionDeviation = parameters.getDoubleOrDefault( PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, - PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_INTERVAL_DEFAULT_VALUE); + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_VARIATION_DEFAULT_VALUE); eventTimeMinInterval = parameters.getLongOrDefault( PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_DEFAULT_VALUE); + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); eventTimeMaxInterval = parameters.getLongOrDefault( PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE); + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); // will not be used arrivalTimeMinInterval = 0; arrivalTimeMaxInterval = Long.MAX_VALUE; } - validatorTimeInterval(validator); + validateTimeInterval(validator); validator.validate( compressionDeviation -> (Double) compressionDeviation >= 0, String.format( @@ -136,7 +152,7 @@ public void customize( LOGGER.info( "ChangingPointSamplingProcessor in {} is initialized with {}: {}, {}: {}, {}: {}, {}: {}, {}: {}.", dataBaseNameWithPathSeparator, - PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_INTERVAL, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_VARIATION, compressionDeviation, PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL, arrivalTimeMinInterval, @@ -157,8 +173,6 @@ public void customize( PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, eventTimeMaxInterval); } - - initPathLastObjectCache(memoryLimitInBytes); } @Override @@ -180,7 +194,7 @@ protected void processRow( AtomicReference exception) { final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow) row); final long currentRowTime = row.getTime(); - final long arrivalTime = System.currentTimeMillis(); + final long arrivalTime = currentTime.apply(); boolean hasNonNullMeasurements = false; for (int i = 0, size = row.size(); i < size; i++) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java index f651852ab52a7..eaa8e2f8ae6cc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java @@ -32,6 +32,8 @@ public class SwingingDoorTrendingFilter extends DownSamplingFilter { private static final long estimatedMemory = RamUsageEstimator.shallowSizeOfInstance(SwingingDoorTrendingFilter.class); + private final long estimatedSize; + /** * The maximum absolute difference the user set if the data's value is within * compressionDeviation, it will be compressed and discarded after compression, it will only store @@ -69,6 +71,13 @@ public SwingingDoorTrendingFilter( super(arrivalTime, eventTime); this.lastStoredValue = firstValue; this.compressionDeviation = compressionDeviation; + if (lastStoredValue instanceof Binary) { + estimatedSize = estimatedMemory + SIZE_OF_BINARY; + } else if (lastStoredValue instanceof LocalDate) { + estimatedSize = estimatedMemory + SIZE_OF_DATE; + } else { + estimatedSize = estimatedMemory + SIZE_OF_LONG; + } } private void init(final long arrivalTime, final long firstTimestamp, final Object firstValue) { @@ -144,13 +153,11 @@ private boolean tryFilter(final long arrivalTime, final long timestamp, final Ob } public void reset(final long arrivalTime, final long timestamp, final Object value) { + super.reset(arrivalTime, timestamp); upperDoor = Double.MIN_VALUE; lowerDoor = Double.MAX_VALUE; - lastPointEventTime = timestamp; lastStoredValue = value; - - lastPointArrivalTime = arrivalTime; } public long estimatedMemory() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java index 596dc77c6b6a7..82a3af7fc8ce4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.pipe.event.common.row.PipeRow; import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor; import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.db.utils.TimestampPrecisionUtils; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; @@ -37,6 +38,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; public class SwingingDoorTrendingSamplingProcessor extends DownSamplingProcessor { @@ -67,24 +69,31 @@ public void validate(PipeParameterValidator validator) throws Exception { Arrays.asList( PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MIN_INTERVAL, PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY), - PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MIN_INTERVAL_DEFAULT_VALUE); + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MIN_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); eventTimeMaxInterval = parameters.getLongOrDefault( Arrays.asList( PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MAX_INTERVAL, PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY), - PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MAX_INTERVAL_DEFAULT_VALUE); + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MAX_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); arrivalTimeMinInterval = parameters.getLongOrDefault( PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MIN_INTERVAL, - PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MIN_INTERVAL_DEFAULT_VALUE); + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MIN_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); arrivalTimeMaxInterval = parameters.getLongOrDefault( PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MAX_INTERVAL, - PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE); + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); - validatorTimeInterval(validator); - initPathLastObjectCache(memoryLimitInBytes); + validateTimeInterval(validator); } @Override @@ -105,8 +114,6 @@ public void customize( eventTimeMinInterval, PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MAX_INTERVAL, eventTimeMaxInterval); - - initPathLastObjectCache(memoryLimitInBytes); } @Override @@ -128,7 +135,7 @@ protected void processRow( AtomicReference exception) { final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow) row); final long currentRowTime = row.getTime(); - final long arrivalTime = System.currentTimeMillis(); + final long arrivalTime = currentTime.apply(); boolean hasNonNullMeasurements = false; for (int i = 0, size = row.size(); i < size; i++) { @@ -153,7 +160,7 @@ protected void processRow( } // It will not be null - if (result) { + if (!result) { remarkableRow.markNull(i); continue; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java index dc1a84ef20c0b..f906196a868b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java @@ -82,8 +82,6 @@ public void customize( memoryLimitInBytes, PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY, shouldSplitFile); - - initPathLastObjectCache(memoryLimitInBytes); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java index 5ac550c13dae3..f2fb2731735b4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java @@ -95,13 +95,13 @@ public class PipeProcessorConstant { "processor.changing-point.event-time.min-interval"; public static final String PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL = "processor.changing-point.event-time.max-interval"; - public static final String PROCESSOR_CHANGING_POINT_VALUE_INTERVAL = - "processor.changing-point.value-interval"; + public static final String PROCESSOR_CHANGING_POINT_VALUE_VARIATION = + "processor.changing-point.value-variation"; public static final long PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL_DEFAULT_VALUE = 0; public static final long PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE = 60000; public static final long PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL_DEFAULT_VALUE = 0; public static final long PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL_DEFAULT_VALUE = 60000; - public static final double PROCESSOR_CHANGING_POINT_VALUE_INTERVAL_DEFAULT_VALUE = 0; + public static final double PROCESSOR_CHANGING_POINT_VALUE_VARIATION_DEFAULT_VALUE = 0; public static final String _PROCESSOR_OUTPUT_SERIES_KEY = "processor.output-series"; public static final String PROCESSOR_OUTPUT_SERIES_KEY = "processor.output.series"; From bec47d856cad01352b11b27ca95692e98ae8dc58 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Fri, 20 Dec 2024 18:37:06 +0800 Subject: [PATCH 7/8] DownSamplingFilter implements Accountable interface --- .../db/pipe/processor/downsampling/DownSamplingFilter.java | 3 ++- .../processor/downsampling/changing/ChangingPointFilter.java | 2 +- .../downsampling/changing/ChangingPointSamplingProcessor.java | 2 +- .../downsampling/sdt/SwingingDoorTrendingFilter.java | 4 ++-- .../sdt/SwingingDoorTrendingSamplingProcessor.java | 2 +- 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java index a74bee8020c7f..012fe2193f526 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java @@ -19,12 +19,13 @@ package org.apache.iotdb.db.pipe.processor.downsampling; +import org.apache.tsfile.utils.Accountable; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.RamUsageEstimator; import java.time.LocalDate; -public abstract class DownSamplingFilter { +public abstract class DownSamplingFilter implements Accountable { protected static final long SIZE_OF_DATE = RamUsageEstimator.shallowSizeOfInstance(LocalDate.class); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java index 945aa4ab01d1d..5e5e60196ad9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java @@ -100,7 +100,7 @@ private boolean tryFilter(final long arrivalTime, final long timestamp, final Ob return false; } - public long estimatedMemory() { + public long ramBytesUsed() { return estimatedSize; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java index b0275ae8fb02e..fe40111cf39fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java @@ -181,7 +181,7 @@ protected void initPathLastObjectCache(final long memoryLimitInBytes) { new PartialPathLastObjectCache(memoryLimitInBytes) { @Override protected long calculateMemoryUsage(ChangingPointFilter object) { - return object.estimatedMemory(); + return object.ramBytesUsed(); } }; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java index eaa8e2f8ae6cc..ee7e201fa30ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java @@ -160,7 +160,7 @@ public void reset(final long arrivalTime, final long timestamp, final Object val lastStoredValue = value; } - public long estimatedMemory() { - return estimatedMemory + 64; + public long ramBytesUsed() { + return estimatedSize; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java index 82a3af7fc8ce4..3925e4b3ade70 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java @@ -122,7 +122,7 @@ protected void initPathLastObjectCache(final long memoryLimitInBytes) { new PartialPathLastObjectCache(memoryLimitInBytes) { @Override protected long calculateMemoryUsage(SwingingDoorTrendingFilter object) { - return object.estimatedMemory(); + return object.ramBytesUsed(); } }; } From 9bb372f0adba97c34f3de8e25f2f9fffa4b341a2 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Fri, 20 Dec 2024 18:38:45 +0800 Subject: [PATCH 8/8] modify code format --- .../db/pipe/processor/downsampling/DownSamplingProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java index 170d8f2822b82..2d808f2a8f71d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java @@ -113,7 +113,6 @@ public void validate(PipeParameterValidator validator) throws Exception { "%s must be > 0, but got %s", PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY, memoryLimitInBytes), memoryLimitInBytes); - initPathLastObjectCache(memoryLimitInBytes); } public void validateTimeInterval(final PipeParameterValidator validator) throws Exception { @@ -177,6 +176,7 @@ public void customize( .getRegionId())) .getDatabaseName() + TsFileConstant.PATH_SEPARATOR; + initPathLastObjectCache(memoryLimitInBytes); } protected abstract void initPathLastObjectCache(long memoryLimitInBytes);