diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java index dfa7957a5746d..99a9533108bd1 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java @@ -150,4 +150,167 @@ public void testTumblingTimeSamplingProcessor() throws Exception { receiverEnv, "select * from root.vehicle.**", "Time,root.vehicle.d0.s1,", expectedResSet); } } + + @Test + public void testChangingValueProcessor() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + // Test empty tsFile parsing + // Assert that an empty tsFile will not be parsed by the processor then block + // the subsequent data processing + // Do not fail if the failure has nothing to do with pipe + // Because the failures will randomly generate due to resource limitation + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.vehicle.d0(time, s1) values (0, 1)", "delete from root.**"))) { + return; + } + + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("source.realtime.mode", "log"); + + processorAttributes.put("processor", "changing-value-sampling-processor"); + processorAttributes.put("processor.changing-value.compression-deviation", "10"); + processorAttributes.put("processor.changing-value.min-time-interval", "10000"); + processorAttributes.put("processor.changing-value.max-time-interval", "20000"); + + connectorAttributes.put("sink", "iotdb-thrift-sink"); + connectorAttributes.put("sink.batch.enable", "false"); + connectorAttributes.put("sink.ip", receiverIp); + connectorAttributes.put("sink.port", Integer.toString(receiverPort)); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("testPipe", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.vehicle.d0(time, s1) values (0, 10)", + "insert into root.vehicle.d0(time, s1) values (9999, 20)", + "insert into root.vehicle.d0(time, s1) values (10000, 30)", + "insert into root.vehicle.d0(time, s1) values (19000, 40)", + "insert into root.vehicle.d0(time, s1) values (20000, 50)", + "insert into root.vehicle.d0(time, s1) values (29001, 60)", + "insert into root.vehicle.d0(time, s1) values (50000, 70)", + "insert into root.vehicle.d0(time, s1) values (60000, 71)", + "flush"))) { + return; + } + + final Set expectedResSet = new HashSet<>(); + + expectedResSet.add("0,10.0,"); + expectedResSet.add("10000,30.0,"); + expectedResSet.add("20000,50.0,"); + expectedResSet.add("50000,70.0,"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "select * from root.**", "Time,root.vehicle.d0.s1,", expectedResSet); + } + } + + @Test + public void testChangingPointProcessor() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + // Test empty tsFile parsing + // Assert that an empty tsFile will not be parsed by the processor then block + // the subsequent data processing + // Do not fail if the failure has nothing to do with pipe + // Because the failures will randomly generate due to resource limitation + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.vehicle.d0(time, s1) values (0, 1)", "delete from root.**"))) { + return; + } + + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("source.realtime.mode", "log"); + + processorAttributes.put("processor", "changing-point-sampling-processor"); + processorAttributes.put("processor.changing-point.value-variation", "10"); + processorAttributes.put("processor.changing-point.arrival-time.min-interval", "10000"); + processorAttributes.put("processor.changing-point.arrival-time.max-interval", "30000"); + processorAttributes.put("processor.changing-point.event-time.min-interval", "10000"); + processorAttributes.put("processor.changing-point.event-time.max-interval", "30000"); + + connectorAttributes.put("sink", "iotdb-thrift-sink"); + connectorAttributes.put("sink.batch.enable", "false"); + connectorAttributes.put("sink.ip", receiverIp); + connectorAttributes.put("sink.port", Integer.toString(receiverPort)); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("testPipe", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.vehicle.d0(time, s1) values (0, 10)", + "insert into root.vehicle.d0(time, s1) values (100000, 20)", + "insert into root.vehicle.d0(time, s1) values (110000, 30)"))) { + return; + } + + Thread.sleep(10000); + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.vehicle.d0(time, s1) values (100000, 40)", + "insert into root.vehicle.d0(time, s1) values (400000, 50)", + "insert into root.vehicle.d0(time, s1) values (500000, 60)"))) { + return; + } + + Thread.sleep(10000); + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList("insert into root.vehicle.d0(time, s1) values (100000, 41)", "flush"))) { + return; + } + + final Set expectedResSet = new HashSet<>(); + + expectedResSet.add("0,10.0,"); + expectedResSet.add("100000,40.0,"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "select * from root.**", "Time,root.vehicle.d0.s1,", expectedResSet); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java index 31cc8250ebf96..a69fbbcc09297 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java @@ -27,7 +27,7 @@ import org.apache.iotdb.db.pipe.processor.aggregate.AggregateProcessor; import org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardStatisticsOperatorProcessor; import org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSamplingProcessor; +import org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingPointSamplingProcessor; import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor; import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor; import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor; @@ -52,7 +52,10 @@ protected void initConstructors() { SwingingDoorTrendingSamplingProcessor::new); pluginConstructors.put( BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(), - ChangingValueSamplingProcessor::new); + ChangingPointSamplingProcessor::new); + pluginConstructors.put( + BuiltinPipePlugin.CHANGING_POINT_SAMPLING_PROCESSOR.getPipePluginName(), + ChangingPointSamplingProcessor::new); pluginConstructors.put( BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(), ThrowingExceptionProcessor::new); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java new file mode 100644 index 0000000000000..012fe2193f526 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingFilter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling; + +import org.apache.tsfile.utils.Accountable; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.time.LocalDate; + +public abstract class DownSamplingFilter implements Accountable { + + protected static final long SIZE_OF_DATE = + RamUsageEstimator.shallowSizeOfInstance(LocalDate.class); + + protected static final long SIZE_OF_LONG = RamUsageEstimator.shallowSizeOfInstance(Long.class); + + protected static final long SIZE_OF_BINARY = + RamUsageEstimator.shallowSizeOfInstance(Binary.class); + + protected boolean isFilteredByArrivalTime = true; + + protected long lastPointArrivalTime; + + protected long lastPointEventTime; + + public DownSamplingFilter( + final long arrivalTime, final long eventTime, final boolean isFilteredByArrivalTime) { + this.lastPointArrivalTime = arrivalTime; + this.lastPointEventTime = eventTime; + this.isFilteredByArrivalTime = isFilteredByArrivalTime; + } + + public DownSamplingFilter(final long arrivalTime, final long eventTime) { + this.lastPointArrivalTime = arrivalTime; + this.lastPointEventTime = eventTime; + } + + public void reset(final long arrivalTime, final long eventTime) { + this.lastPointArrivalTime = arrivalTime; + this.lastPointEventTime = eventTime; + } + + public long getLastPointArrivalTime() { + return lastPointArrivalTime; + } + + public long getLastPointEventTime() { + return lastPointEventTime; + } + + public boolean isFilteredByArrivalTime() { + return isFilteredByArrivalTime; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java index a8e0c270570bb..b198a4e003530 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.utils.TimestampPrecisionUtils; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.EventCollector; @@ -38,6 +39,7 @@ import org.apache.tsfile.common.constant.TsFileConstant; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE; @@ -46,13 +48,56 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY; public abstract class DownSamplingProcessor implements PipeProcessor { + + protected static final CurrentTimeFunction currentTime; + + @FunctionalInterface + protected interface CurrentTimeFunction { + long apply(); + } + + static { + switch (TimestampPrecisionUtils.currPrecision) { + case NANOSECONDS: + currentTime = System::nanoTime; + break; + case MICROSECONDS: + currentTime = () -> TimeUnit.NANOSECONDS.toMicros(System.nanoTime()); + break; + default: + currentTime = System::currentTimeMillis; + } + } + protected long memoryLimitInBytes; protected boolean shouldSplitFile; protected String dataBaseNameWithPathSeparator; - protected PartialPathLastObjectCache pathLastObjectCache; + /** + * The minimum interval of arrival times in milliseconds. Represents the minimum time interval + * between the arrival times of two consecutive events. + */ + protected long arrivalTimeMinInterval; + + /** + * The maximum interval of arrival times in milliseconds. Represents the maximum time interval + * between the arrival times of two consecutive events. + */ + protected long arrivalTimeMaxInterval; + + /** + * The minimum interval of event times in milliseconds. Represents the minimum time interval + * between the event times of two consecutive events. + */ + protected long eventTimeMinInterval; + + /** + * The maximum interval of event times in milliseconds. Represents the maximum time interval + * between the event times of two consecutive events. + */ + protected long eventTimeMaxInterval; @Override public void validate(PipeParameterValidator validator) throws Exception { @@ -71,6 +116,50 @@ public void validate(PipeParameterValidator validator) throws Exception { memoryLimitInBytes); } + public void validateTimeInterval(final PipeParameterValidator validator) throws Exception { + validator + .validate( + eventTimeMinInterval -> (long) eventTimeMinInterval >= 0, + String.format( + "%s must be >= 0, but got %s", "event-time.min-interval", eventTimeMinInterval), + eventTimeMinInterval) + .validate( + eventTimeMaxInterval -> (long) eventTimeMaxInterval >= 0, + String.format( + "%s must be >= 0, but got %s", "event-time.max-interval", eventTimeMaxInterval), + eventTimeMaxInterval) + .validate( + minMaxPair -> (Long) minMaxPair[0] <= (Long) minMaxPair[1], + String.format( + "%s must be <= %s, but got %s and %s", + "event-time.min-interval", + "event-time.max-interval", + eventTimeMinInterval, + eventTimeMaxInterval), + eventTimeMinInterval, + eventTimeMaxInterval) + .validate( + arrivalTimeMinInterval -> (long) arrivalTimeMinInterval >= 0, + String.format( + "%s must be >= 0, but got %s", "arrival-time.min-interval", arrivalTimeMinInterval), + arrivalTimeMinInterval) + .validate( + arrivalTimeMaxInterval -> (long) arrivalTimeMaxInterval >= 0, + String.format( + "%s must be >= 0, but got %s", "arrival-time.max-interval", arrivalTimeMaxInterval), + arrivalTimeMaxInterval) + .validate( + minMaxPair -> (Long) minMaxPair[0] <= (Long) minMaxPair[1], + String.format( + "%s must be <= %s, but got %s and %s", + "arrival-time.min-interval", + "arrival-time.max-interval", + arrivalTimeMinInterval, + arrivalTimeMaxInterval), + arrivalTimeMinInterval, + arrivalTimeMaxInterval); + } + @Override public void customize( PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) { @@ -88,11 +177,10 @@ public void customize( .getRegionId())) .getDatabaseName() + TsFileConstant.PATH_SEPARATOR; - - pathLastObjectCache = initPathLastObjectCache(memoryLimitInBytes); + initPathLastObjectCache(memoryLimitInBytes); } - protected abstract PartialPathLastObjectCache initPathLastObjectCache(long memoryLimitInBytes); + protected abstract void initPathLastObjectCache(long memoryLimitInBytes); @Override public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) @@ -138,6 +226,37 @@ protected abstract void processRow( String deviceSuffix, AtomicReference exception); + /** + * Determine the arrival time interval and event time interval. + * + * @return true to indicate that the process does not need to be continued and the data can be + * updated directly. False means that the event is discarded directly. Null means that the + * downsampling algorithm can continue to be executed. + */ + protected Boolean filterArrivalTimeAndEventTime( + final DownSamplingFilter filter, final long arrivalTime, final long eventTime) { + final long arrivalTimeInterval = Math.abs(arrivalTime - filter.getLastPointArrivalTime()); + + if (filter.isFilteredByArrivalTime()) { + if (arrivalTimeInterval >= arrivalTimeMaxInterval) { + return Boolean.TRUE; + } + if (arrivalTimeInterval < arrivalTimeMinInterval) { + return Boolean.FALSE; + } + } + + final long eventTimeInterval = Math.abs(eventTime - filter.getLastPointEventTime()); + if (eventTimeInterval >= eventTimeMaxInterval) { + return Boolean.TRUE; + } + + if (eventTimeInterval < eventTimeMinInterval) { + return Boolean.FALSE; + } + return null; + } + /** * If data comes in {@link TsFileInsertionEvent}, we will not split it into {@link * TabletInsertionEvent} by default, because the data in {@link TsFileInsertionEvent} is already @@ -182,11 +301,4 @@ public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector ev public void process(Event event, EventCollector eventCollector) throws Exception { eventCollector.collect(event); } - - @Override - public void close() throws Exception { - if (pathLastObjectCache != null) { - pathLastObjectCache.close(); - } - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java new file mode 100644 index 0000000000000..5e5e60196ad9b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling.changing; + +import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingFilter; +import org.apache.iotdb.pipe.api.type.Binary; + +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.time.LocalDate; +import java.util.Objects; + +public class ChangingPointFilter extends DownSamplingFilter { + + private static final long estimatedMemory = + RamUsageEstimator.shallowSizeOfInstance(ChangingPointFilter.class); + + private final long estimatedSize; + + /** + * The maximum absolute difference the user set if the data's value is within + * compressionDeviation, it will be compressed and discarded after compression + */ + private final double compressionDeviation; + + private Object lastStoredValue; + + public ChangingPointFilter( + final long arrivalTime, + final long firstTimestamp, + final Object firstValue, + final double compressionDeviation, + final boolean isFilterArrivalTime) { + super(arrivalTime, firstTimestamp, isFilterArrivalTime); + lastStoredValue = firstValue; + this.compressionDeviation = compressionDeviation; + + if (lastStoredValue instanceof Binary) { + estimatedSize = estimatedMemory + SIZE_OF_BINARY; + } else if (lastStoredValue instanceof LocalDate) { + estimatedSize = estimatedMemory + SIZE_OF_DATE; + } else { + estimatedSize = estimatedMemory + SIZE_OF_LONG; + } + } + + public void reset(final long arrivalTime, long firstTimestamp, final Object firstValue) { + reset(arrivalTime, firstTimestamp); + lastStoredValue = firstValue; + } + + public boolean filter(final long arrivalTime, final long timestamp, final Object value) { + try { + return tryFilter(arrivalTime, timestamp, value); + } catch (final Exception e) { + reset(arrivalTime, timestamp, value); + return true; + } + } + + private boolean tryFilter(final long arrivalTime, final long timestamp, final Object value) { + // For non-numerical types, we only compare the value + if (value instanceof Boolean + || value instanceof String + || value instanceof Binary + || value instanceof LocalDate) { + if (Objects.equals(lastStoredValue, value)) { + return false; + } + + reset(arrivalTime, timestamp, value); + return true; + } + + // For other numerical types, we compare the value difference + if (Math.abs( + Double.parseDouble(lastStoredValue.toString()) - Double.parseDouble(value.toString())) + > compressionDeviation) { + reset(arrivalTime, timestamp, value); + return true; + } + + return false; + } + + public long ramBytesUsed() { + return estimatedSize; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java new file mode 100644 index 0000000000000..fe40111cf39fa --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling.changing; + +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; +import org.apache.iotdb.db.pipe.event.common.row.PipeRemarkableRow; +import org.apache.iotdb.db.pipe.event.common.row.PipeRow; +import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor; +import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.db.utils.TimestampPrecisionUtils; +import org.apache.iotdb.pipe.api.access.Row; +import org.apache.iotdb.pipe.api.collector.RowCollector; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import org.apache.tsfile.common.constant.TsFileConstant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class ChangingPointSamplingProcessor extends DownSamplingProcessor { + + private static final Logger LOGGER = + LoggerFactory.getLogger(ChangingPointSamplingProcessor.class); + + /** + * The maximum absolute difference the user set if the data's value is within + * compressionDeviation, it will be compressed and discarded after compression + */ + private double compressionDeviation; + + private boolean isFilteredByArrivalTime = true; + + private PartialPathLastObjectCache pathLastObjectCache; + + @Override + public void validate(PipeParameterValidator validator) throws Exception { + super.validate(validator); + + final PipeParameters parameters = validator.getParameters(); + compressionDeviation = + parameters.getDoubleOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION_DEFAULT_VALUE); + + final boolean isChangingPointProcessor = + BuiltinPipePlugin.CHANGING_POINT_SAMPLING_PROCESSOR + .getPipePluginName() + .equals(parameters.getString("processor")); + + if (isChangingPointProcessor) { + isFilteredByArrivalTime = true; + compressionDeviation = + parameters.getDoubleOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_VARIATION, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_VARIATION_DEFAULT_VALUE); + eventTimeMinInterval = + parameters.getLongOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL, + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant + .PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); + eventTimeMaxInterval = + parameters.getLongOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL, + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant + .PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); + arrivalTimeMinInterval = + parameters.getLongOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL, + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant + .PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); + arrivalTimeMaxInterval = + parameters.getLongOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL, + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant + .PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); + } else { + isFilteredByArrivalTime = false; + compressionDeviation = + parameters.getDoubleOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_VARIATION_DEFAULT_VALUE); + eventTimeMinInterval = + parameters.getLongOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY, + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); + eventTimeMaxInterval = + parameters.getLongOrDefault( + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); + // will not be used + arrivalTimeMinInterval = 0; + arrivalTimeMaxInterval = Long.MAX_VALUE; + } + + validateTimeInterval(validator); + validator.validate( + compressionDeviation -> (Double) compressionDeviation >= 0, + String.format( + "%s must be >= 0, but got %s", + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, + compressionDeviation), + compressionDeviation); + } + + @Override + public void customize( + PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) { + super.customize(parameters, configuration); + + final boolean isChangingPointProcessor = + BuiltinPipePlugin.CHANGING_POINT_SAMPLING_PROCESSOR + .getPipePluginName() + .equals(parameters.getString("processor")); + + if (isChangingPointProcessor) { + LOGGER.info( + "ChangingPointSamplingProcessor in {} is initialized with {}: {}, {}: {}, {}: {}, {}: {}, {}: {}.", + dataBaseNameWithPathSeparator, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_VARIATION, + compressionDeviation, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL, + arrivalTimeMinInterval, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL, + arrivalTimeMaxInterval, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL, + eventTimeMinInterval, + PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL, + eventTimeMaxInterval); + } else { + LOGGER.info( + "ChangingValueSamplingProcessor in {} is initialized with {}: {}, {}: {}, {}: {}.", + dataBaseNameWithPathSeparator, + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, + compressionDeviation, + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY, + eventTimeMinInterval, + PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, + eventTimeMaxInterval); + } + } + + @Override + protected void initPathLastObjectCache(final long memoryLimitInBytes) { + pathLastObjectCache = + new PartialPathLastObjectCache(memoryLimitInBytes) { + @Override + protected long calculateMemoryUsage(ChangingPointFilter object) { + return object.ramBytesUsed(); + } + }; + } + + @Override + protected void processRow( + Row row, + RowCollector rowCollector, + String deviceSuffix, + AtomicReference exception) { + final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow) row); + final long currentRowTime = row.getTime(); + final long arrivalTime = currentTime.apply(); + + boolean hasNonNullMeasurements = false; + for (int i = 0, size = row.size(); i < size; i++) { + if (row.isNull(i)) { + continue; + } + + final String timeSeriesSuffix = + deviceSuffix + TsFileConstant.PATH_SEPARATOR + row.getColumnName(i); + final ChangingPointFilter filter = + pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix); + + if (Objects.nonNull(filter)) { + final Boolean result = filterArrivalTimeAndEventTime(filter, arrivalTime, currentRowTime); + if (Objects.isNull(result)) { + if (filter.filter(arrivalTime, currentRowTime, row.getObject(i))) { + hasNonNullMeasurements = true; + } else { + remarkableRow.markNull(i); + } + continue; + } + + // It will not be null + if (!result) { + remarkableRow.markNull(i); + continue; + } + + // The arrival time or event time is greater than the maximum time interval + filter.reset(arrivalTime, currentRowTime, row.getObject(i)); + } else { + pathLastObjectCache.setPartialPathLastObject( + timeSeriesSuffix, + new ChangingPointFilter( + arrivalTime, + currentRowTime, + row.getObject(i), + compressionDeviation, + isFilteredByArrivalTime)); + } + + hasNonNullMeasurements = true; + } + + if (hasNonNullMeasurements) { + try { + rowCollector.collectRow(remarkableRow); + } catch (IOException e) { + exception.set(e); + } + } + } + + @Override + public void close() throws Exception { + if (pathLastObjectCache != null) { + pathLastObjectCache.close(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java deleted file mode 100644 index 7dc8c87c09b2d..0000000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.processor.downsampling.changing; - -import org.apache.iotdb.pipe.api.type.Binary; - -import java.time.LocalDate; -import java.util.Objects; - -public class ChangingValueFilter { - - private final ChangingValueSamplingProcessor processor; - - /** - * The last stored time and value we compare current point against lastReadTimestamp and - * lastReadValue - */ - private long lastStoredTimestamp; - - private T lastStoredValue; - - public ChangingValueFilter( - final ChangingValueSamplingProcessor processor, - final long firstTimestamp, - final T firstValue) { - this.processor = processor; - init(firstTimestamp, firstValue); - } - - private void init(final long firstTimestamp, final T firstValue) { - lastStoredTimestamp = firstTimestamp; - lastStoredValue = firstValue; - } - - public boolean filter(final long timestamp, final T value) { - try { - return tryFilter(timestamp, value); - } catch (final Exception e) { - init(timestamp, value); - return true; - } - } - - private boolean tryFilter(final long timestamp, final T value) { - final long timeDiff = Math.abs(timestamp - lastStoredTimestamp); - - if (timeDiff <= processor.getCompressionMinTimeInterval()) { - return false; - } - - if (timeDiff >= processor.getCompressionMaxTimeInterval()) { - reset(timestamp, value); - return true; - } - - // For non-numerical types, we only compare the value - if (value instanceof Boolean - || value instanceof String - || value instanceof Binary - || value instanceof LocalDate) { - if (Objects.equals(lastStoredValue, value)) { - return false; - } - - reset(timestamp, value); - return true; - } - - // For other numerical types, we compare the value difference - if (Math.abs( - Double.parseDouble(lastStoredValue.toString()) - Double.parseDouble(value.toString())) - > processor.getCompressionDeviation()) { - reset(timestamp, value); - return true; - } - - return false; - } - - private void reset(final long timestamp, final T value) { - lastStoredTimestamp = timestamp; - lastStoredValue = value; - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueSamplingProcessor.java deleted file mode 100644 index 0ab08ecb403a9..0000000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueSamplingProcessor.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.processor.downsampling.changing; - -import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; -import org.apache.iotdb.db.pipe.event.common.row.PipeRemarkableRow; -import org.apache.iotdb.db.pipe.event.common.row.PipeRow; -import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; -import org.apache.iotdb.pipe.api.access.Row; -import org.apache.iotdb.pipe.api.annotation.TreeModel; -import org.apache.iotdb.pipe.api.collector.RowCollector; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; - -import org.apache.tsfile.common.constant.TsFileConstant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; - -@TreeModel -public class ChangingValueSamplingProcessor extends DownSamplingProcessor { - - private static final Logger LOGGER = - LoggerFactory.getLogger(ChangingValueSamplingProcessor.class); - - /** - * The maximum absolute difference the user set if the data's value is within - * compressionDeviation, it will be compressed and discarded after compression - */ - private double compressionDeviation; - - /** - * The minimum time distance between two stored data points if current point time to the last - * stored point time distance <= compressionMinTimeInterval, current point will NOT be stored - * regardless of compression deviation - */ - private long compressionMinTimeInterval; - - /** - * The maximum time distance between two stored data points if current point time to the last - * stored point time distance >= compressionMaxTimeInterval, current point will be stored - * regardless of compression deviation - */ - private long compressionMaxTimeInterval; - - private PartialPathLastObjectCache> pathLastObjectCache; - - @Override - public void validate(PipeParameterValidator validator) throws Exception { - super.validate(validator); - - final PipeParameters parameters = validator.getParameters(); - compressionDeviation = - parameters.getDoubleOrDefault( - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION_DEFAULT_VALUE); - compressionMinTimeInterval = - parameters.getLongOrDefault( - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_DEFAULT_VALUE); - compressionMaxTimeInterval = - parameters.getLongOrDefault( - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE); - - validator - .validate( - compressionDeviation -> (Double) compressionDeviation >= 0, - String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, - compressionDeviation), - compressionDeviation) - .validate( - compressionMinTimeInterval -> (Long) compressionMinTimeInterval >= 0, - String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY, - compressionMinTimeInterval), - compressionMinTimeInterval) - .validate( - compressionMaxTimeInterval -> (Long) compressionMaxTimeInterval >= 0, - String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, - compressionMaxTimeInterval), - compressionMaxTimeInterval) - .validate( - minMaxPair -> (Long) minMaxPair[0] <= (Long) minMaxPair[1], - String.format( - "%s must be <= %s, but got %s and %s", - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, - compressionMinTimeInterval, - compressionMaxTimeInterval), - compressionMinTimeInterval, - compressionMaxTimeInterval); - } - - @Override - public void customize( - PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) { - super.customize(parameters, configuration); - - LOGGER.info( - "ChangingValueSamplingProcessor in {} is initialized with {}: {}, {}: {}, {}: {}.", - dataBaseNameWithPathSeparator, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION, - compressionDeviation, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY, - compressionMinTimeInterval, - PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY, - compressionMaxTimeInterval); - } - - @Override - protected PartialPathLastObjectCache initPathLastObjectCache(long memoryLimitInBytes) { - pathLastObjectCache = - new PartialPathLastObjectCache>(memoryLimitInBytes) { - @Override - protected long calculateMemoryUsage(ChangingValueFilter object) { - return 64; // Long.BYTES * 8 - } - }; - return pathLastObjectCache; - } - - @Override - protected void processRow( - Row row, - RowCollector rowCollector, - String deviceSuffix, - AtomicReference exception) { - final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow) row); - - boolean hasNonNullMeasurements = false; - for (int i = 0, size = row.size(); i < size; i++) { - if (row.isNull(i)) { - continue; - } - - final String timeSeriesSuffix = - deviceSuffix + TsFileConstant.PATH_SEPARATOR + row.getColumnName(i); - final ChangingValueFilter filter = - pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix); - - if (filter != null) { - if (filter.filter(row.getTime(), row.getObject(i))) { - hasNonNullMeasurements = true; - } else { - remarkableRow.markNull(i); - } - } else { - hasNonNullMeasurements = true; - pathLastObjectCache.setPartialPathLastObject( - timeSeriesSuffix, new ChangingValueFilter<>(this, row.getTime(), row.getObject(i))); - } - } - - if (hasNonNullMeasurements) { - try { - rowCollector.collectRow(remarkableRow); - } catch (IOException e) { - exception.set(e); - } - } - } - - double getCompressionDeviation() { - return compressionDeviation; - } - - long getCompressionMinTimeInterval() { - return compressionMinTimeInterval; - } - - long getCompressionMaxTimeInterval() { - return compressionMaxTimeInterval; - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java index 850cbd3ed0729..ee7e201fa30ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java @@ -19,14 +19,27 @@ package org.apache.iotdb.db.pipe.processor.downsampling.sdt; +import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingFilter; import org.apache.iotdb.pipe.api.type.Binary; +import org.apache.tsfile.utils.RamUsageEstimator; + import java.time.LocalDate; import java.util.Objects; -public class SwingingDoorTrendingFilter { +public class SwingingDoorTrendingFilter extends DownSamplingFilter { + + private static final long estimatedMemory = + RamUsageEstimator.shallowSizeOfInstance(SwingingDoorTrendingFilter.class); + + private final long estimatedSize; - private final SwingingDoorTrendingSamplingProcessor processor; + /** + * The maximum absolute difference the user set if the data's value is within + * compressionDeviation, it will be compressed and discarded after compression, it will only store + * out of range (time, data) to form the trend + */ + private final double compressionDeviation; /** * The maximum curUpperSlope between the lastStoredPoint to the current point upperDoor can only @@ -46,56 +59,51 @@ public class SwingingDoorTrendingFilter { */ private long lastReadTimestamp; - private T lastReadValue; + private Object lastReadValue; - /** - * The last stored time and value we compare current point against lastReadTimestamp and - * lastReadValue - */ - private long lastStoredTimestamp; - - private T lastStoredValue; + private Object lastStoredValue; public SwingingDoorTrendingFilter( - final SwingingDoorTrendingSamplingProcessor processor, - final long firstTimestamp, - final T firstValue) { - this.processor = processor; - init(firstTimestamp, firstValue); + final long arrivalTime, + final long eventTime, + final Object firstValue, + final double compressionDeviation) { + super(arrivalTime, eventTime); + this.lastStoredValue = firstValue; + this.compressionDeviation = compressionDeviation; + if (lastStoredValue instanceof Binary) { + estimatedSize = estimatedMemory + SIZE_OF_BINARY; + } else if (lastStoredValue instanceof LocalDate) { + estimatedSize = estimatedMemory + SIZE_OF_DATE; + } else { + estimatedSize = estimatedMemory + SIZE_OF_LONG; + } } - private void init(final long firstTimestamp, final T firstValue) { + private void init(final long arrivalTime, final long firstTimestamp, final Object firstValue) { upperDoor = Double.MIN_VALUE; lowerDoor = Double.MAX_VALUE; lastReadTimestamp = firstTimestamp; lastReadValue = firstValue; - lastStoredTimestamp = firstTimestamp; + lastPointEventTime = firstTimestamp; lastStoredValue = firstValue; + + this.lastPointArrivalTime = arrivalTime; } - public boolean filter(final long timestamp, final T value) { + public boolean filter(final long arrivalTime, final long timestamp, final Object value) { try { - return tryFilter(timestamp, value); + return tryFilter(arrivalTime, timestamp, value); } catch (final Exception e) { - init(timestamp, value); + init(arrivalTime, timestamp, value); return true; } } - private boolean tryFilter(final long timestamp, final T value) { - final long timeDiff = timestamp - lastStoredTimestamp; - final long absTimeDiff = Math.abs(timeDiff); - - if (absTimeDiff <= processor.getCompressionMinTimeInterval()) { - return false; - } - - if (absTimeDiff >= processor.getCompressionMaxTimeInterval()) { - reset(timestamp, value); - return true; - } + private boolean tryFilter(final long arrivalTime, final long timestamp, final Object value) { + final long timeDiff = timestamp - lastPointEventTime; // For boolean and string type, we only compare the value if (value instanceof Boolean @@ -106,7 +114,7 @@ private boolean tryFilter(final long timestamp, final T value) { return false; } - reset(timestamp, value); + reset(arrivalTime, timestamp, value); return true; } @@ -115,18 +123,18 @@ private boolean tryFilter(final long timestamp, final T value) { final double lastStoredDoubleValue = Double.parseDouble(lastStoredValue.toString()); final double valueDiff = doubleValue - lastStoredDoubleValue; - final double currentUpperSlope = (valueDiff - processor.getCompressionDeviation()) / timeDiff; + final double currentUpperSlope = (valueDiff - compressionDeviation) / timeDiff; if (currentUpperSlope > upperDoor) { upperDoor = currentUpperSlope; } - final double currentLowerSlope = (valueDiff + processor.getCompressionDeviation()) / timeDiff; + final double currentLowerSlope = (valueDiff + compressionDeviation) / timeDiff; if (currentLowerSlope < lowerDoor) { lowerDoor = currentLowerSlope; } if (upperDoor > lowerDoor) { - lastStoredTimestamp = lastReadTimestamp; + lastPointEventTime = lastReadTimestamp; lastStoredValue = lastReadValue; upperDoor = currentUpperSlope; @@ -144,11 +152,15 @@ private boolean tryFilter(final long timestamp, final T value) { return false; } - private void reset(final long timestamp, final T value) { + public void reset(final long arrivalTime, final long timestamp, final Object value) { + super.reset(arrivalTime, timestamp); upperDoor = Double.MIN_VALUE; lowerDoor = Double.MAX_VALUE; - lastStoredTimestamp = timestamp; lastStoredValue = value; } + + public long ramBytesUsed() { + return estimatedSize; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java index f7e90f749c8d4..3023defed0540 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.pipe.event.common.row.PipeRow; import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor; import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.db.utils.TimestampPrecisionUtils; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.collector.RowCollector; @@ -36,6 +37,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @TreeModel @@ -51,21 +55,7 @@ public class SwingingDoorTrendingSamplingProcessor extends DownSamplingProcessor */ private double compressionDeviation; - /** - * The minimum time distance between two stored data points if current point time to the last - * stored point time distance <= compressionMinTimeInterval, current point will NOT be stored - * regardless of compression deviation - */ - private long compressionMinTimeInterval; - - /** - * The maximum time distance between two stored data points if current point time to the last - * stored point time distance >= compressionMaxTimeInterval, current point will be stored - * regardless of compression deviation - */ - private long compressionMaxTimeInterval; - - private PartialPathLastObjectCache> pathLastObjectCache; + private PartialPathLastObjectCache pathLastObjectCache; @Override public void validate(PipeParameterValidator validator) throws Exception { @@ -76,47 +66,36 @@ public void validate(PipeParameterValidator validator) throws Exception { parameters.getDoubleOrDefault( PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY, PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_DEFAULT_VALUE); - compressionMinTimeInterval = + eventTimeMinInterval = + parameters.getLongOrDefault( + Arrays.asList( + PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MIN_INTERVAL, + PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY), + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MIN_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); + eventTimeMaxInterval = + parameters.getLongOrDefault( + Arrays.asList( + PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MAX_INTERVAL, + PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY), + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MAX_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); + arrivalTimeMinInterval = parameters.getLongOrDefault( - PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_DEFAULT_VALUE); - compressionMaxTimeInterval = + PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MIN_INTERVAL, + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MIN_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); + arrivalTimeMaxInterval = parameters.getLongOrDefault( - PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE); - - validator - .validate( - compressionDeviation -> (Double) compressionDeviation >= 0, - String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY, - compressionDeviation), - compressionDeviation) - .validate( - compressionMinTimeInterval -> (Long) compressionMinTimeInterval >= 0, - String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, - compressionMinTimeInterval), - compressionMinTimeInterval) - .validate( - compressionMaxTimeInterval -> (Long) compressionMaxTimeInterval >= 0, - String.format( - "%s must be >= 0, but got %s", - PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, - compressionMaxTimeInterval), - compressionMaxTimeInterval) - .validate( - minMaxPair -> (Long) minMaxPair[0] <= (Long) minMaxPair[1], - String.format( - "%s must be <= %s, but got %s and %s", - PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, - PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, - compressionMinTimeInterval, - compressionMaxTimeInterval), - compressionMinTimeInterval, - compressionMaxTimeInterval); + PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MAX_INTERVAL, + TimestampPrecisionUtils.convertToCurrPrecision( + PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE, + TimeUnit.MILLISECONDS)); + + validateTimeInterval(validator); } @Override @@ -125,26 +104,29 @@ public void customize( super.customize(parameters, configuration); LOGGER.info( - "SwingingDoorTrendingSamplingProcessor in {} is initialized with {}: {}, {}: {}, {}: {}.", + "SwingingDoorTrendingSamplingProcessor in {} is initialized with {}: {}, {}: {}, {}: {}, {}: {}, {}: {}.", dataBaseNameWithPathSeparator, PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY, compressionDeviation, - PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY, - compressionMinTimeInterval, - PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY, - compressionMaxTimeInterval); + PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MIN_INTERVAL, + arrivalTimeMinInterval, + PipeProcessorConstant.PROCESSOR_SDT_ARRIVAL_TIME_MAX_INTERVAL, + arrivalTimeMaxInterval, + PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MIN_INTERVAL, + eventTimeMinInterval, + PipeProcessorConstant.PROCESSOR_SDT_EVENT_TIME_MAX_INTERVAL, + eventTimeMaxInterval); } @Override - protected PartialPathLastObjectCache initPathLastObjectCache(long memoryLimitInBytes) { + protected void initPathLastObjectCache(final long memoryLimitInBytes) { pathLastObjectCache = - new PartialPathLastObjectCache>(memoryLimitInBytes) { + new PartialPathLastObjectCache(memoryLimitInBytes) { @Override - protected long calculateMemoryUsage(SwingingDoorTrendingFilter object) { - return 64; // Long.BYTES * 8 + protected long calculateMemoryUsage(SwingingDoorTrendingFilter object) { + return object.ramBytesUsed(); } }; - return pathLastObjectCache; } @Override @@ -154,6 +136,8 @@ protected void processRow( String deviceSuffix, AtomicReference exception) { final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow) row); + final long currentRowTime = row.getTime(); + final long arrivalTime = currentTime.apply(); boolean hasNonNullMeasurements = false; for (int i = 0, size = row.size(); i < size; i++) { @@ -166,18 +150,33 @@ protected void processRow( final SwingingDoorTrendingFilter filter = pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix); - if (filter != null) { - if (filter.filter(row.getTime(), row.getObject(i))) { - hasNonNullMeasurements = true; - } else { + if (Objects.nonNull(filter)) { + final Boolean result = filterArrivalTimeAndEventTime(filter, arrivalTime, currentRowTime); + if (Objects.isNull(result)) { + if (filter.filter(arrivalTime, currentRowTime, row.getObject(i))) { + hasNonNullMeasurements = true; + } else { + remarkableRow.markNull(i); + } + continue; + } + + // It will not be null + if (!result) { remarkableRow.markNull(i); + continue; } + + // The arrival time or event time is greater than the maximum time interval + filter.reset(arrivalTime, currentRowTime, row.getObject(i)); } else { - hasNonNullMeasurements = true; pathLastObjectCache.setPartialPathLastObject( timeSeriesSuffix, - new SwingingDoorTrendingFilter<>(this, row.getTime(), row.getObject(i))); + new SwingingDoorTrendingFilter( + arrivalTime, currentRowTime, row.getObject(i), compressionDeviation)); } + + hasNonNullMeasurements = true; } if (hasNonNullMeasurements) { @@ -189,15 +188,10 @@ protected void processRow( } } - double getCompressionDeviation() { - return compressionDeviation; - } - - long getCompressionMinTimeInterval() { - return compressionMinTimeInterval; - } - - long getCompressionMaxTimeInterval() { - return compressionMaxTimeInterval; + @Override + public void close() throws Exception { + if (pathLastObjectCache != null) { + pathLastObjectCache.close(); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java index a7411aada3efa..80e739728e0de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java @@ -87,7 +87,7 @@ public void customize( } @Override - protected PartialPathLastObjectCache initPathLastObjectCache(long memoryLimitInBytes) { + protected void initPathLastObjectCache(final long memoryLimitInBytes) { pathLastObjectCache = new PartialPathLastObjectCache(memoryLimitInBytes) { @Override @@ -95,7 +95,6 @@ protected long calculateMemoryUsage(Long object) { return Long.BYTES; } }; - return pathLastObjectCache; } @Override @@ -134,4 +133,11 @@ protected void processRow( } } } + + @Override + public void close() throws Exception { + if (pathLastObjectCache != null) { + pathLastObjectCache.close(); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index 74bc0d9815aa8..6fc7888284360 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -23,7 +23,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.StandardStatisticsProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.TumblingWindowingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.ChangingValueSamplingProcessor; +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.ChangingPointSamplingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.pipeconsensus.PipeConsensusProcessor; @@ -65,7 +65,9 @@ public enum BuiltinPipePlugin { "tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class), SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", SwingingDoorTrendingSamplingProcessor.class), CHANGING_VALUE_SAMPLING_PROCESSOR( - "changing-value-sampling-processor", ChangingValueSamplingProcessor.class), + "changing-value-sampling-processor", ChangingPointSamplingProcessor.class), + CHANGING_POINT_SAMPLING_PROCESSOR( + "changing-point-sampling-processor", ChangingPointSamplingProcessor.class), THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", ThrowingExceptionProcessor.class), AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class), COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class), @@ -151,6 +153,7 @@ public String getClassName() { TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), + CHANGING_POINT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(), AGGREGATE_PROCESSOR.getPipePluginName().toUpperCase(), COUNT_POINT_PROCESSOR.getPipePluginName().toUpperCase(), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/downsampling/ChangingValueSamplingProcessor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/downsampling/ChangingPointSamplingProcessor.java similarity index 95% rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/downsampling/ChangingValueSamplingProcessor.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/downsampling/ChangingPointSamplingProcessor.java index 0cb723dca8cc2..2215a202e8bdd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/downsampling/ChangingValueSamplingProcessor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/downsampling/ChangingPointSamplingProcessor.java @@ -27,4 +27,4 @@ * be imported here. The pipe agent in the server module will replace this class with the real * implementation when initializing the changing-value-sampling-processor. */ -public class ChangingValueSamplingProcessor extends PlaceHolderProcessor {} +public class ChangingPointSamplingProcessor extends PlaceHolderProcessor {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java index 3874be8e817f7..b286633ae1447 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java @@ -61,10 +61,20 @@ public class PipeProcessorConstant { public static final double PROCESSOR_SDT_COMPRESSION_DEVIATION_DEFAULT_VALUE = 0; public static final String PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY = "processor.sdt.min-time-interval"; - public static final long PROCESSOR_SDT_MIN_TIME_INTERVAL_DEFAULT_VALUE = 0; public static final String PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY = "processor.sdt.max-time-interval"; - public static final long PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE = Long.MAX_VALUE; + public static final String PROCESSOR_SDT_ARRIVAL_TIME_MIN_INTERVAL = + "processor.sdt.arrival-time.min-interval"; + public static final String PROCESSOR_SDT_ARRIVAL_TIME_MAX_INTERVAL = + "processor.sdt.arrival-time.max-interval"; + public static final String PROCESSOR_SDT_EVENT_TIME_MIN_INTERVAL = + "processor.sdt.event-time.min-interval"; + public static final String PROCESSOR_SDT_EVENT_TIME_MAX_INTERVAL = + "processor.sdt.event-time.max-interval"; + public static final long PROCESSOR_SDT_ARRIVAL_TIME_MIN_INTERVAL_DEFAULT_VALUE = 0; + public static final long PROCESSOR_SDT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE = 60000; + public static final long PROCESSOR_SDT_EVENT_TIME_MIN_INTERVAL_DEFAULT_VALUE = 0; + public static final long PROCESSOR_SDT_EVENT_TIME_MAX_INTERVAL_DEFAULT_VALUE = 60000; public static final String PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION = "processor.changing-value.compression-deviation"; @@ -77,6 +87,22 @@ public class PipeProcessorConstant { public static final long PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE = Long.MAX_VALUE; + public static final String PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL = + "processor.changing-point.arrival-time.min-interval"; + public static final String PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL = + "processor.changing-point.arrival-time.max-interval"; + public static final String PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL = + "processor.changing-point.event-time.min-interval"; + public static final String PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL = + "processor.changing-point.event-time.max-interval"; + public static final String PROCESSOR_CHANGING_POINT_VALUE_VARIATION = + "processor.changing-point.value-variation"; + public static final long PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL_DEFAULT_VALUE = 0; + public static final long PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE = 60000; + public static final long PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL_DEFAULT_VALUE = 0; + public static final long PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL_DEFAULT_VALUE = 60000; + public static final double PROCESSOR_CHANGING_POINT_VALUE_VARIATION_DEFAULT_VALUE = 0; + public static final String _PROCESSOR_OUTPUT_SERIES_KEY = "processor.output-series"; public static final String PROCESSOR_OUTPUT_SERIES_KEY = "processor.output.series";