diff --git a/caffe2/python/operator_test/rebatching_queue_test.py b/caffe2/python/operator_test/rebatching_queue_test.py new file mode 100644 index 00000000000..af207bedc69 --- /dev/null +++ b/caffe2/python/operator_test/rebatching_queue_test.py @@ -0,0 +1,287 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals +from caffe2.python import core, workspace +from caffe2.python.test_util import TestCase + +import numpy as np +import numpy.testing as npt + +from hypothesis import given +import hypothesis.strategies as st + +import functools + + +def primefac(n): + ret = [] + divisor = 2 + while divisor * divisor <= n: + while (n % divisor) == 0: + ret.append(divisor) + n = n // divisor + divisor = divisor + 1 + if n > 1: + ret.append(n) + return ret + + +class TestReBatchingQueue(TestCase): + def test_rebatching_queue_single_enqueue_dequeue(self): + net = core.Net('net') + + tensors = [ + net.ConstantFill([], 1, value=1.0, run_once=False) + for times in range(3) + ] + + queue = net.CreateRebatchingQueue([], 1, capacity=10, num_blobs=1) + + net.EnqueueRebatchingQueue([queue, tensors[0]], []) + net.EnqueueRebatchingQueue([queue, tensors[1]], []) + net.EnqueueRebatchingQueue([queue, tensors[2]], []) + + results = [ + net.DequeueRebatchingQueue([queue], 1), + net.DequeueRebatchingQueue([queue], 1), + net.DequeueRebatchingQueue([queue], 1), + ] + + workspace.RunNetOnce(net) + + for idx in range(3): + self.assertEquals(workspace.FetchBlob(results[idx]), [1.0]) + + def test_rebatching_queue_multi_enqueue_dequeue(self): + net = core.Net('net') + workspace.FeedBlob( + "tensors", np.array([x for x in range(10)], np.int32) + ) + + queue = net.CreateRebatchingQueue([], 1, capacity=10, num_blobs=1) + + net.EnqueueRebatchingQueue([queue, "tensors"], [], enqueue_batch=True) + + results = [ + net.DequeueRebatchingQueue([queue], 1, num_elements=5), + net.DequeueRebatchingQueue([queue], 1, num_elements=5), + ] + + workspace.RunNetOnce(net) + + npt.assert_array_equal( + workspace.FetchBlob(results[0]), workspace.FetchBlob("tensors")[:5] + ) + npt.assert_array_equal( + workspace.FetchBlob(results[1]), workspace.FetchBlob("tensors")[5:] + ) + + def test_rebatching_queue_closes_properly(self): + net = core.Net('net') + workspace.FeedBlob( + "tensors", np.array([x for x in range(10)], np.int32) + ) + + queue = net.CreateRebatchingQueue([], 1, capacity=10, num_blobs=1) + + net.EnqueueRebatchingQueue([queue, "tensors"], 0, enqueue_batch=True) + + net.CloseRebatchingQueue([queue], 0) + + results = [ + net.DequeueRebatchingQueue([queue], 1, num_elements=5), + net.DequeueRebatchingQueue([queue], 1, num_elements=5), + ] + + workspace.RunNetOnce(net) + + npt.assert_array_equal( + workspace.FetchBlob(results[0]), workspace.FetchBlob("tensors")[:5] + ) + npt.assert_array_equal( + workspace.FetchBlob(results[1]), workspace.FetchBlob("tensors")[5:] + ) + + # Enqueuing more should fail now since the queue is closed + net.EnqueueRebatchingQueue([queue, "tensors"], [], enqueue_batch=True) + + with self.assertRaises(RuntimeError): + workspace.RunNetOnce(net) + + # Dequeuing more should fail now since the queue is closed + results = [ + net.DequeueRebatchingQueue([queue], 1, num_elements=5), + ] + + with self.assertRaises(RuntimeError): + workspace.RunNetOnce(net) + + def test_rebatching_queue_multiple_components(self): + NUM_BLOBS = 4 + NUM_ELEMENTS = 10 + + net = core.Net('net') + + workspace.blobs['complex_tensor'] = np.array( + [[x, x + 1] for x in range(NUM_ELEMENTS)], dtype=np.int32 + ) + + tensors = [ + net.GivenTensorIntFill( + [], + 1, + shape=[NUM_ELEMENTS], + values=[x for x in range(NUM_ELEMENTS)] + ), + net.GivenTensorFill( + [], + 1, + shape=[NUM_ELEMENTS], + values=[x * 1.0 for x in range(NUM_ELEMENTS)] + ), + net.GivenTensorBoolFill( + [], + 1, + shape=[NUM_ELEMENTS], + values=[(x % 2 == 0) for x in range(NUM_ELEMENTS)] + ), + 'complex_tensor', + ] + + queue = net.CreateRebatchingQueue( + [], 1, capacity=10, num_blobs=NUM_BLOBS + ) + + net.EnqueueRebatchingQueue([queue] + tensors, [], enqueue_batch=True) + + results = net.DequeueRebatchingQueue([queue], NUM_BLOBS, num_elements=5) + + workspace.RunNetOnce(net) + + for idx in range(NUM_BLOBS): + npt.assert_array_equal( + workspace.FetchBlob(results[idx]), + workspace.FetchBlob(tensors[idx])[:5] + ) + + @given( + num_producers=st.integers(1, 5), + num_consumers=st.integers(1, 5), + producer_input_size=st.integers(1, 10), + producer_num_iterations=st.integers(1, 10), + capacity=st.integers(1, 10) + ) + def test_rebatching_parallel_producer_consumer( + self, num_producers, num_consumers, producer_input_size, + producer_num_iterations, capacity + ): + ### Init ### + total_inputs = producer_num_iterations * producer_input_size * num_producers + inputs = [] + init_net = core.Net('init_net') + queue = init_net.CreateRebatchingQueue( + [], 1, capacity=capacity, num_blobs=1 + ) + + ### Producers ### + producer_steps = [] + for i in range(num_producers): + name = 'producer_%d' % i + net = core.Net(name) + values = [ + producer_input_size * i + x for x in range(producer_input_size) + ] + for _ in range(producer_num_iterations): + inputs.extend(values) + tensors = net.GivenTensorIntFill( + [], 1, shape=[producer_input_size], values=values + ) + + net.EnqueueRebatchingQueue([queue, tensors], [], enqueue_batch=True) + + step = core.execution_step( + name, net, num_iter=producer_num_iterations + ) + producer_steps.append(step) + + producer_step = core.execution_step( + 'producer', [ + core.execution_step( + 'producers', producer_steps, concurrent_substeps=True + ) + ] + ) + + ### Consumers ### + outputs = [] + + def append(ins, outs): + # Extend is atomic + outputs.extend(ins[0].data.tolist()) + + consumer_steps = [] + for i in range(num_consumers): + # This is just a way of deterministally read all the elements. + # We make `num_consumers` almost equal splits + # (the reminder goes to the last consumer). + num_elements_to_read = total_inputs // num_consumers + if i == num_consumers - 1: + num_elements_to_read = num_elements_to_read \ + + total_inputs % num_consumers + + # If we have nothing to read this consumer will be idle + if (num_elements_to_read == 0): + continue + + # Now we have to make a split on number of iterations and the read + # size for each iteration. This is again just one of many + # deterministic ways of doing it. We factorize the total number of + # elements we have to read and assign half of the factors to the + # iterations half to the read size. + factors = list(primefac(num_elements_to_read)) + + num_elements_per_iteration = functools.reduce( + lambda x, y: x * y, factors[len(factors) // 2:], 1 + ) + + num_iterations = functools.reduce( + lambda x, y: x * y, factors[:len(factors) // 2], 1 + ) + + name = 'consumer_%d' % i + net = core.Net(name) + blobs = net.DequeueRebatchingQueue( + [queue], 1, num_elements=num_elements_per_iteration + ) + net.Python(append)([blobs], 0) + consumer_steps.append( + core.execution_step(name, net, num_iter=num_iterations) + ) + + consumer_step = core.execution_step( + 'consumer', consumer_steps, concurrent_substeps=True + ) + + init_step = core.execution_step('init', init_net) + worker_step = core.execution_step( + 'worker', [consumer_step, producer_step], concurrent_substeps=True + ) + + ### Execute Plan ### + plan = core.Plan('test') + plan.AddStep(init_step) + plan.AddStep(worker_step) + + self.ws.run(plan) + + ### Check Results ### + # We check that the outputs are a permutation of inputs + inputs.sort() + outputs.sort() + self.assertEquals(inputs, outputs) + + +if __name__ == "__main__": + import unittest + unittest.main() diff --git a/caffe2/queue/rebatching_queue.cc b/caffe2/queue/rebatching_queue.cc new file mode 100644 index 00000000000..0cf5e19e1fa --- /dev/null +++ b/caffe2/queue/rebatching_queue.cc @@ -0,0 +1,232 @@ +#include "rebatching_queue.h" +#include "caffe2/utils/smart_tensor_printer.h" + +namespace caffe2 { + +namespace { + +// This concat function will always create a new first dimension to concat +void concat( + CPUContext& context, + const std::vector>& inputs, + const std::vector& outputs) { + CAFFE_ENFORCE(!inputs.empty()); + + const auto& inputZero = inputs[0]; + const auto numTensors = inputZero.size(); + const auto numRows = inputs.size(); + + // Precompute the output sizes to avoid resizing + std::vector> outputDims(numTensors); + + for (int i = 0; i < numTensors; ++i) { + SmartTensorPrinter::PrintTensor(inputZero.at(i)); + outputDims[i] = inputZero.at(i).dims(); + outputDims[i].insert(outputDims[i].begin(), numRows); + } + + // Resize to the final output size + std::vector destinations(numTensors); + for (int i = 0; i < numTensors; ++i) { + outputs[i]->Resize(outputDims[i]); + destinations[i] = outputs[i]->raw_mutable_data(inputZero[i].meta()); + } + + for (int i = 0; i < numRows; ++i) { + CAFFE_ENFORCE_EQ(inputs[i].size(), numTensors); + + for (int j = 0; j < numTensors; ++j) { + const auto& input = inputs[i][j]; + + CAFFE_ENFORCE(inputZero[j].meta() == input.meta()); + CAFFE_ENFORCE_EQ(inputZero[j].itemsize(), input.itemsize()); + CAFFE_ENFORCE_EQ(inputZero[j].ndim(), input.ndim()); + for (int k = 0; k < input.ndim(); ++k) { + CAFFE_ENFORCE_EQ(input.dims()[k], inputZero[j].dims()[k]); + } + + // Skip empty tensors + if (input.size() == 0) { + continue; + } + + context.CopyItems( + input.meta(), + input.size(), + input.raw_data() /* src */, + destinations[j] /* dst */ + ); + + destinations[j] = + (char*)destinations[j] + input.size() * input.itemsize(); + } + } +} + +auto split(CPUContext& context, const std::vector& inputs) { + CAFFE_ENFORCE(!inputs.empty()); + + const auto outputSize = inputs[0]->dims().at(0); + std::vector> outputs(outputSize); + + for (const auto* inputPtr : inputs) { + CAFFE_ENFORCE(inputPtr); + + const auto& input = *inputPtr; + const auto innerSize = input.size_from_dim(1); + const auto itemSize = input.meta().itemsize(); + + auto outputDims = input.dims(); + CAFFE_ENFORCE(!outputDims.empty()); + outputDims.erase(outputDims.begin()); + CAFFE_ENFORCE_EQ(input.dims().at(0), outputSize); + + for (int i = 0; i < outputSize; ++i) { + outputs[i].push_back(TensorCPU(outputDims)); + context.CopyItems( + input.meta(), + innerSize, + (char*)input.raw_data() + i * innerSize * itemSize /* src */, + outputs[i].back().raw_mutable_data(input.meta()) /* dst */); + } + } + + return outputs; +} +} // anonymous namespace + +RebatchingQueue::RebatchingQueue(size_t capacity, size_t numBlobs) + : capacity_(capacity), numBlobs_(numBlobs), queue_(capacity) {} + +RebatchingQueue::~RebatchingQueue() { + close(); +} + +bool RebatchingQueue::canRead() const { + return tail_ < head_; +} + +bool RebatchingQueue::dequeue( + CPUContext& context, + size_t numElements, + const std::vector& outputs) { + std::vector> results; + results.reserve(numElements); + + for (;;) { + if (results.size() == numElements) { + break; + } + + { + std::unique_lock lock(mutex_); + + cvEmpty_.wait(lock, [this] { return canRead() || isClosed_; }); + + // We only want to stop reading if the queue is empty and closed + if (!canRead() && isClosed_) { + break; + } + + do { + results.push_back(std::move(queue_[tail_++ % capacity()])); + } while (canRead() && results.size() < numElements); + } + + if (numElements == 1) { + cvOverflow_.notify_one(); + } else { + cvOverflow_.notify_all(); + } + } + + if (results.empty()) { + return false; + } + + concat(context, results, outputs); + + return true; +} + +bool RebatchingQueue::canWrite() const { + return tail_ + capacity() > head_; +} + +bool RebatchingQueue::enqueueOne( + CPUContext& context, + const std::vector& inputs) { + std::vector> splittedInputs; + splittedInputs.emplace_back(); + auto& tensorVector = splittedInputs.back(); + tensorVector.reserve(inputs.size()); + for (const auto* tensorPtr : inputs) { + tensorVector.push_back(*tensorPtr); + } + + return enqueue(std::move(splittedInputs)); +} + +bool RebatchingQueue::enqueueMany( + CPUContext& context, + const std::vector& inputs) { + CAFFE_ENFORCE_EQ(numBlobs_, inputs.size()); + + std::vector> splittedInputs; + splittedInputs = split(context, inputs); + return enqueue(std::move(splittedInputs)); +} + +bool RebatchingQueue::enqueue( + std::vector> splittedInputs) { + int idx = 0; + for (;;) { + if (idx >= splittedInputs.size()) { + break; + } + + { + std::unique_lock lock(mutex_); + + cvOverflow_.wait(lock, [this] { return canWrite() || isClosed_; }); + + if (isClosed_) { + // If we are here it means that we didn't apply the entire batch and if + // we get closed in the middle of enquing we treat it as a non-success. + return false; + } + + do { + queue_[head_++ % capacity()] = std::move(splittedInputs[idx++]); + } while (canWrite() && idx < splittedInputs.size()); + } + + cvEmpty_.notify_all(); + } + + return true; +} + +size_t RebatchingQueue::capacity() const { + return capacity_; +} + +size_t RebatchingQueue::numBlobs() const { + return numBlobs_; +} + +bool RebatchingQueue::isClosed() const { + std::lock_guard g(mutex_); + return isClosed_; +} + +void RebatchingQueue::close() { + { + std::lock_guard g(mutex_); + isClosed_ = true; + } + + cvEmpty_.notify_all(); + cvOverflow_.notify_all(); +} +} // caffe2 diff --git a/caffe2/queue/rebatching_queue.h b/caffe2/queue/rebatching_queue.h new file mode 100644 index 00000000000..052402e02fe --- /dev/null +++ b/caffe2/queue/rebatching_queue.h @@ -0,0 +1,68 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "caffe2/core/logging.h" +#include "caffe2/core/operator.h" +#include "caffe2/core/stats.h" +#include "caffe2/core/tensor.h" + +namespace caffe2 { + +// TODO: This is a very naive implementation with a single mutex. We can do the +// atomic index + circular queue optimizations or pull something more +// heavy-weight later + +class RebatchingQueue { + public: + RebatchingQueue(size_t capacity, size_t numBlobs); + + ~RebatchingQueue(); + + bool enqueueOne( + CPUContext& context, + const std::vector& inputs); + + bool enqueueMany( + CPUContext& context, + const std::vector& inputs); + + bool dequeue( + CPUContext& context, + size_t numElements, + const std::vector& outputs); + + size_t capacity() const; + + size_t numBlobs() const; + + bool isClosed() const; + + void close(); + + private: + bool enqueue(std::vector> splittedInputs); + + bool canWrite() const; + bool canRead() const; + + const size_t capacity_; + const size_t numBlobs_; + + mutable std::mutex mutex_; + + bool isClosed_{false}; + + uint64_t head_{0}; + uint64_t tail_{0}; + + std::condition_variable cvEmpty_; + std::condition_variable cvOverflow_; + + std::vector> queue_; +}; +} // caffe2 diff --git a/caffe2/queue/rebatching_queue_ops.cc b/caffe2/queue/rebatching_queue_ops.cc new file mode 100644 index 00000000000..f31cf985070 --- /dev/null +++ b/caffe2/queue/rebatching_queue_ops.cc @@ -0,0 +1,73 @@ +#include "rebatching_queue_ops.h" + +namespace caffe2 { + +CAFFE_KNOWN_TYPE(RebatchingQueuePtr); + +namespace { + +REGISTER_CPU_OPERATOR(CreateRebatchingQueue, CreateRebatchingQueueOp); +REGISTER_CPU_OPERATOR(EnqueueRebatchingQueue, EnqueueRebatchingQueueOp); +REGISTER_CPU_OPERATOR(DequeueRebatchingQueue, DequeueRebatchingQueueOp); +REGISTER_CPU_OPERATOR(CloseRebatchingQueue, CloseRebatchingQueueOp); + +NO_GRADIENT(CreateRebatchingQueue); +NO_GRADIENT(EnqueueRebatchingQueue); +NO_GRADIENT(DequeueRebatchingQueue); +NO_GRADIENT(CloseRebatchingQueue); + +OPERATOR_SCHEMA(CreateRebatchingQueue) + .NumInputs(0) + .NumOutputs(1) + .SetDoc(R"DOC( + Creates the Queue. +)DOC") + .Output(0, "queue", "object representing the queue") + .Arg("num_blobs", "Number of input tensors the queue will support") + .Arg( + "capacity", + "Maximal number of elements the queue can hold at any given point"); + +OPERATOR_SCHEMA(CloseRebatchingQueue) + .NumInputs(1) + .NumOutputs(0) + .SetDoc(R"DOC( + Closes the Queue. +)DOC") + .Input(0, "queue", "object representing the queue"); + +OPERATOR_SCHEMA(EnqueueRebatchingQueue) + .NumInputs(2, INT_MAX) + .NumOutputs(0) + .SetDoc(R"DOC( + Enqueues Tensors into the queue. + Number of input tensors should be equal to the number of components passed + during creation of the queue. + If the Queue is closed this operation will fail. + If enqueue_batch argument is set. We will split the input tensors by the + first dimension to produce single queue elements. +)DOC") + .Input(0, "queue", "object representing the queue") + .Input(1, "tensor", "First tensor to enque. ") + .Arg( + "enqueue_batch", + "Are we enqueuing a batch or just a single element. \ + By default we enqueue single element."); + +OPERATOR_SCHEMA(DequeueRebatchingQueue) + .NumInputs(1) + .NumOutputs(1, INT_MAX) + .SetDoc(R"DOC( + Dequeue Tensors from the Queue. + If the Queue is closed this might return less elements than asked. + If num_elements > 1 the returned elements will be concatenated into one + tensor per component. + +)DOC") + .Input(0, "rebatching_queue", "object representing the queue") + .Input(1, "tensor", "First tensor to enqueue") + .Arg( + "num_elements", + "Number of elements to dequeue. By default we dequeue one element."); +} +} diff --git a/caffe2/queue/rebatching_queue_ops.h b/caffe2/queue/rebatching_queue_ops.h new file mode 100644 index 00000000000..80749a42692 --- /dev/null +++ b/caffe2/queue/rebatching_queue_ops.h @@ -0,0 +1,83 @@ +#pragma once + +#include "rebatching_queue.h" + +namespace caffe2 { + +using RebatchingQueuePtr = std::unique_ptr; + +class CreateRebatchingQueueOp : public Operator { + public: + CreateRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws) + : Operator(operator_def, ws) {} + + bool RunOnDevice() override { + *OperatorBase::Output(0) = + RebatchingQueuePtr(new RebatchingQueue( + OperatorBase::GetSingleArgument("capacity", 1), + OperatorBase::GetSingleArgument("num_blobs", 1))); + return true; + } +}; + +class EnqueueRebatchingQueueOp : public Operator { + public: + EnqueueRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws) + : Operator(operator_def, ws), + enqueueBatch_( + OperatorBase::GetSingleArgument("enqueue_batch", false)) {} + bool RunOnDevice() override { + auto& queue = Inputs()[0]->template Get(); + CHECK(queue); + CAFFE_ENFORCE_EQ(InputSize(), queue->numBlobs() + 1); + std::vector inputTensors; + inputTensors.reserve(InputSize() - 1); + for (int i = 1; i < InputSize(); ++i) { + inputTensors.push_back(&Input(i)); + } + + return enqueueBatch_ ? queue->enqueueMany(context_, inputTensors) + : queue->enqueueOne(context_, inputTensors); + } + + private: + const bool enqueueBatch_; +}; + +class DequeueRebatchingQueueOp : public Operator { + public: + DequeueRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws) + : Operator(operator_def, ws), + numElements_(OperatorBase::GetSingleArgument("num_elements", 1)) {} + + bool RunOnDevice() override { + auto& queue = Inputs()[0]->template Get(); + CHECK(queue); + + std::vector outputTensors; + outputTensors.reserve(OutputSize()); + for (int i = 0; i < OutputSize(); ++i) { + outputTensors.push_back(Output(i)); + } + + return queue->dequeue(context_, numElements_, outputTensors); + } + + private: + int numElements_; +}; + +class CloseRebatchingQueueOp : public Operator { + public: + CloseRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws) + : Operator(operator_def, ws) {} + + bool RunOnDevice() override { + CAFFE_ENFORCE_EQ(InputSize(), 1); + auto& queue = Inputs()[0]->template Get(); + CAFFE_ENFORCE(queue); + queue->close(); + return true; + } +}; +} // caffe2